diff --git a/README.md b/README.md index b08290d..310ca9f 100644 --- a/README.md +++ b/README.md @@ -36,15 +36,12 @@ advantage of. - [x] Goto - [x] Gosub - [x] Exception handling -- [x] Terminal library ([Blessed]) - - [x] Process-isolated `Terminal` to circumvent atomic `TERM` -- [ ] UI components - - [x] Block editor - - [x] Line editor (block editor with a single line) - - [ ] Horizontal lightbar - - [ ] Vertical lightbar - - [ ] Matrix (vertical/horizontal lightbar) - - [ ] Panel (scrollable boundary) +- [x] Terminal library ([rich]) + - [x] Adapt for SSH session usage +- [ ] UI components ([textual]) + - [x] Adapt for SSH session usage + - [ ] Block editor + - [ ] Line editor (block editor with a single line) diff --git a/requirements/dev.txt b/requirements/dev.txt index 5169dc7..e5fe791 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -49,7 +49,7 @@ identify==2.5.22 # via pre-commit idna==3.4 # via requests -importlib-metadata==6.1.0 +importlib-metadata==6.8.0 # via # keyring # twine @@ -92,7 +92,7 @@ pre-commit==3.2.1 # via -r dev.in pycparser==2.21 # via cffi -pygments==2.15.0 +pygments==2.16.1 # via readme-renderer pyproject-hooks==1.0.0 # via build @@ -144,7 +144,7 @@ wheel==0.40.0 # via # pip-tools # python-semantic-release -zipp==3.15.0 +zipp==3.17.0 # via importlib-metadata # The following packages are considered to be unsafe in a requirements file: diff --git a/requirements/requirements.in b/requirements/requirements.in index adf1cb0..a04960f 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -3,9 +3,10 @@ Click aiofiles asyncssh bcrypt -blessed gino redis +rich +textual toml uvicorn[standard] uvloop diff --git a/requirements/requirements.txt b/requirements/requirements.txt index d4f08ad..8506250 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -22,8 +22,6 @@ asyncssh==2.13.1 # via -r requirements.in bcrypt==4.0.1 # via -r requirements.in -blessed==1.20.0 - # via -r requirements.in blinker==1.6.2 # via flask cffi==1.15.1 @@ -52,10 +50,19 @@ httptools==0.5.0 # via uvicorn idna==3.4 # via anyio +importlib-metadata==6.8.0 + # via textual itsdangerous==2.1.2 # via flask jinja2==3.1.2 # via flask +linkify-it-py==2.0.2 + # via markdown-it-py +markdown-it-py[linkify,plugins]==3.0.0 + # via + # mdit-py-plugins + # rich + # textual markupsafe==2.1.2 # via # jinja2 @@ -64,6 +71,10 @@ marshmallow==3.19.0 # via # flask-marshmallow # webargs +mdit-py-plugins==0.4.0 + # via markdown-it-py +mdurl==0.1.2 + # via markdown-it-py packaging==23.0 # via # apispec @@ -71,24 +82,40 @@ packaging==23.0 # webargs pycparser==2.21 # via cffi +pygments==2.16.1 + # via rich python-dotenv==1.0.0 # via uvicorn pyyaml==6.0.1 # via uvicorn redis==4.5.4 # via -r requirements.in -six==1.16.0 +rich==13.6.0 # via - # blessed - # flask-marshmallow + # -r requirements.in + # textual +six==1.16.0 + # via flask-marshmallow sniffio==1.3.0 # via anyio sqlalchemy==1.3.24 # via gino +textual==0.38.1 + # via -r requirements.in toml==0.10.2 # via -r requirements.in +tree-sitter==0.20.2 + # via + # textual + # tree-sitter-languages +tree-sitter-languages==1.7.0 + # via textual typing-extensions==4.5.0 - # via asyncssh + # via + # asyncssh + # textual +uc-micro-py==1.0.2 + # via linkify-it-py uvicorn[standard]==0.21.1 # via -r requirements.in uvloop==0.17.0 @@ -97,8 +124,6 @@ uvloop==0.17.0 # uvicorn watchfiles==0.19.0 # via uvicorn -wcwidth==0.2.6 - # via blessed webargs==8.2.0 # via apiflask websockets==10.4 @@ -107,3 +132,5 @@ werkzeug==2.3.3 # via flask wrapt==1.15.0 # via -r requirements.in +zipp==3.17.0 + # via importlib-metadata diff --git a/userland/artwork/login.ans b/userland/artwork/login.ans index 2a7b893..9d870bc 100644 --- a/userland/artwork/login.ans +++ b/userland/artwork/login.ans @@ -1,3 +1,14 @@ - -^(;,,;)^ - + ..:::.. +____ :::''''': ________ + ____\ \  ::::|¯¯¯:::  >>_  .:::, +,::. _.:: > \_ `:::| |:' seventy nine  /  <\____/_ ::::: +`::'.::::'/  /\___/____``| |___ ______ ______\_____  / ``' +  .:::'_// ____/_ _¯¯VV_ \> \`:::. +::::.\\ > /  '  _'/  /\`:::: +::::::.\______X\ ____| ,.__ _.,___V__.,_/  X_______/.::::: +::::::........\_____\ |___: ..¯....... /____/... ....::::: +`:::::::::::::..  ,::. ....::::::::::: .:.., .::'  `^:::^' ,. +,. `::::::::::::'  `::' `:::::::::::::' ::::: .:::,`' +`'  `^^++^^`  ,..:::,`:::::::::::'  ``'  ,. ::::: +`'  ::::: `^:::::^' `'  ``'  hX! +``' diff --git a/userland/artwork/oneliners.ans b/userland/artwork/oneliners.ans new file mode 100644 index 0000000..0a03cea --- /dev/null +++ b/userland/artwork/oneliners.ans @@ -0,0 +1,9 @@ +:%###########################%:| |:%#########################################%: + ______ ______ _____ | | ___ ______ _____ _________ _____ +___/|__/ \_< ___/___| |._\ _)__/\_< ___/___\___\< __/___ +\_. | ._ / _/ /| || | ._ / _/ / _// \__ / + \ \| | |/ /| V / | || | |/ /| V /| \ _/ V / + \_______l___<____\|________\ | ||____|___<____\|________\|___\___\|________\ +| |_____________________________________________ +:%########################hX!%:|_______________________________________________ + diff --git a/userland/models/oneliner.py b/userland/models/oneliner.py index d4c41f8..a955ea9 100644 --- a/userland/models/oneliner.py +++ b/userland/models/oneliner.py @@ -4,7 +4,7 @@ from datetime import datetime # 3rd party -from sqlalchemy import Column, DateTime, ForeignKey, Integer, String +from sqlalchemy import Column, DateTime, ForeignKey, Integer, Unicode # api from xthulu.models import User @@ -27,7 +27,7 @@ class Oneliner(db.Model): ) """User who left the oneliner""" - message = Column(String(78)) + message = Column(Unicode(78)) """The oneliner message""" timestamp = Column(DateTime(), default=datetime.utcnow) diff --git a/userland/scripts/editor_demo.py b/userland/scripts/editor_demo.py deleted file mode 100644 index af978e3..0000000 --- a/userland/scripts/editor_demo.py +++ /dev/null @@ -1,79 +0,0 @@ -"""Editor demo script""" - -# 3rd party -from blessed.keyboard import Keystroke - -# api -from xthulu.ssh.context import SSHContext -from xthulu.ssh.ui.editors import BlockEditor -from xthulu.ssh.terminal.constants import CTRL_C - -# local -from userland.handle_events import handle_events - - -async def main(cx: SSHContext): - banner = ( - cx.term.bright_white_on_magenta_underline(" Editor demo "), - "\r\n\r\n", - ) - cx.echo(*("\r\n\r\n", *banner)) - - # demonstration text - text_to_repeat = [ - "one " * 200, - "two " * 200, - "three " * 200, - "four " * 200, - "five " * 200, - "six " * 200, - "seven " * 200, - "", - ] - text: list[str] = [] - - for _ in range(10): - for line in text_to_repeat: - text.append(line.strip()) - - editor = BlockEditor( - term=cx.term, - rows=10, - columns=cx.term.width - 1, - pos=[0, 0], - cursor=[0, 0], - value=text, - ) - - # if the editor should be redrawn - dirty = True - - while True: - if dirty: - cx.echo(editor.redraw(anchor=False)) - dirty = False - - ks: Keystroke | None = None - - while not ks: - _, dirty = handle_events(cx) - - if dirty: - editor.columns = cx.term.width - 1 - editor.cursor[0] = min(editor.cursor[0], editor.columns - 1) - cx.echo(*(cx.term.clear(), *banner)) - - break - - ks = await cx.term.inkey(1) - - if ks is None: - continue - - if ks.code == cx.term.KEY_ESCAPE or ks == CTRL_C: - # move cursor after the last row of the editor before exiting - out = (cx.term.normal, "\r\n" * (editor.rows - editor.cursor[1])) - - return cx.echo(*out) - - cx.echo(editor.process_keystroke(ks)) diff --git a/userland/scripts/lock_example.py b/userland/scripts/lock_example.py index 84017f9..47c7a52 100644 --- a/userland/scripts/lock_example.py +++ b/userland/scripts/lock_example.py @@ -4,29 +4,25 @@ from asyncio import sleep # api +from xthulu.ssh.console.input import wait_for_key from xthulu.ssh.context import SSHContext async def main(cx: SSHContext): - cx.echo( - cx.term.normal, - "\r\n\r\n", - cx.term.bright_white_on_yellow_underline(" Shared locks demo "), - "\r\n\r\n", - ) + cx.echo("\n\n[bright_white on yellow underline] Shared locks demo [/]\n\n") with cx.lock("testing") as l: if l: - if cx.encoding == "utf-8": - cx.echo("🔒 ") + lock = "🔒 " if cx.encoding == "utf-8" else "" - cx.echo("Lock acquired; press any key to release\r\n") - await cx.term.inkey() + await wait_for_key( + cx, f"{lock}Lock acquired; press any key to release" + ) if cx.encoding == "utf-8": cx.echo("🔥 ") - cx.echo("Lock released!\r\n") + cx.echo("Lock released!\n") await sleep(1) return @@ -34,4 +30,5 @@ async def main(cx: SSHContext): if cx.encoding == "utf-8": cx.echo("⌠") - cx.echo("Failed to acquire lock\r\n") + cx.echo("Failed to acquire lock\n") + await sleep(2) diff --git a/userland/scripts/oneliners.py b/userland/scripts/oneliners.py index d4c4e81..fbda08f 100644 --- a/userland/scripts/oneliners.py +++ b/userland/scripts/oneliners.py @@ -1,122 +1,133 @@ """Oneliners script""" +# stdlib +from math import floor + +# 3rd party +from rich.text import Text +from textual import events +from textual.validation import Length +from textual.widgets import Input, Label, ListItem, ListView + # api +from xthulu.resources import Resources +from xthulu.ssh.console.app import XthuluApp +from xthulu.ssh.console.art import load_art from xthulu.ssh.context import SSHContext -from xthulu.ssh.ui.editors import LineEditor -from xthulu.ssh.terminal.constants import CTRL_C # local -from userland.handle_events import handle_events from userland.models import Oneliner LIMIT = 200 -"""Total number of oneliners to keep""" - -DISPLAY_LIMIT = 10 -"""Number of oneliners to display on screen""" - - -async def main(cx: SSHContext): - async def get_oneliners(): - recent = ( - Oneliner.select("id") - .order_by(Oneliner.id.desc()) - .limit(LIMIT) - .alias("recent") - .select() +"""Total number of oneliners to load""" + + +class OnlinersApp(XthuluApp): + artwork: list[str] + banner: Label + error_message: Label + oneliners: list[Oneliner] + + def __init__( + self, + context: SSHContext, + oneliners: list[Oneliner], + artwork: list[str], + **kwargs, + ): + self.artwork = artwork + self.oneliners = oneliners + super().__init__(context, **kwargs) + + def _update_banner(self): + padded = [] + pad_left = " " * floor(self.context.term.width / 2 - 40) + + for line in self.artwork: + padded += [pad_left, line] + + text = Text.from_ansi("".join(padded), overflow="crop", end="") + self.banner.update(text) + + def compose(self): + # banner + self.banner = Label(markup=False) + + if self.console.height < len(self.artwork) + 10: + self.banner.visible = False + else: + self._update_banner() + + yield self.banner + + # oneliners + list = ListView( + *[ListItem(Label(o.message)) for o in self.oneliners], + initial_index=len(self.oneliners) - 1, + ) + list.scroll_end(animate=False) + yield list + + # error message + self.error_message = Label(id="err") + self.error_message.visible = False + yield self.error_message + + # input + input_widget = Input( + placeholder="Enter a oneliner or press ESC", + validators=Length( + maximum=78, + failure_description="Too long; must be <= 78 characters", + ), + validate_on=("submitted",), ) - oneliners = await Oneliner.query.where( - Oneliner.id.in_(recent) - ).gino.all() - count = len(oneliners) - offset = max(0, count - DISPLAY_LIMIT) - - return oneliners, count, offset - - def display_oneliners(): - for ol in oneliners[offset : offset + DISPLAY_LIMIT]: - cx.echo( - cx.term.clear_eol(), - cx.term.move_x(0), - ol.message[: cx.term.width - 1], - "\r\n", + input_widget.focus() + yield input_widget + + async def on_input_submitted(self, event: Input.Submitted) -> None: + if event.validation_result and not event.validation_result.is_valid: + message = "".join( + ( + " ", + "... ".join(event.validation_result.failure_descriptions), + ) ) + self.error_message.update(message) + self.error_message.visible = True + return - def done(): - cx.echo("\r\n") - - banner = (cx.term.bright_white_on_cyan_underline(" Oneliners "), "\r\n\r\n") - cx.echo(*("\r\n", *banner)) - oneliners, count, offset = await get_oneliners() - first = True - editor = LineEditor(cx.term, cx.term.width - 1, limit=79) - - while True: - if not first: - cx.echo(cx.term.move_x(0)) - up = max(0, min(count - 1, DISPLAY_LIMIT)) - - if up > 0: - cx.echo(cx.term.move_up(up)) - - display_oneliners() - first = False - dirty = True - - while True: - if dirty: - cx.echo(cx.term.move_x(0) + editor.redraw()) - dirty = False - - ks = None + val = event.input.value.strip() - while not ks: - _, dirty = handle_events(cx) + if val != "": + await Oneliner.create(message=val, user_id=self.context.user.id) - if dirty: - editor.columns = cx.term.width - 1 - editor.cursor[0] = min(editor.cursor[0], editor.columns) - cx.echo(*(cx.term.clear(), *banner)) - display_oneliners() + self.exit() - break + async def on_key(self, event: events.Key) -> None: + if event.key == "escape": + self.exit() - ks = await cx.term.inkey(1) + def on_resize(self, event: events.Resize) -> None: + if event.size.height < len(self.artwork) + 10: + self.banner.update("") + self.banner.visible = False + else: + self._update_banner() + self.banner.visible = True - if ks is None: - continue - if ks.code == cx.term.KEY_UP: - last = offset - offset = max(0, offset - 1) - - if last > 0 and last != offset: - break - - continue - - if ks.code == cx.term.KEY_DOWN: - last = offset - offset = min(count - DISPLAY_LIMIT, offset + 1) - - if count > DISPLAY_LIMIT and last != offset: - break - - continue - - if ks.code == cx.term.KEY_ESCAPE or ks == CTRL_C: - return done() - - if ks.code == cx.term.KEY_ENTER: - val = editor.value[0].strip() - - if len(val) == 0: - return done() - - await Oneliner.create(user_id=cx.user.id, message=val) - oneliners, count, offset = await get_oneliners() - editor.reset() - - break +async def main(cx: SSHContext): + db = Resources().db + oneliners = [ + oneliner + for oneliner in reversed( + await db.all( + Oneliner.query.order_by(Oneliner.id.desc()).limit(LIMIT) + ), + ) + ] - cx.echo(editor.process_keystroke(ks)) + artwork = await load_art("userland/artwork/oneliners.ans", "amiga") + app = OnlinersApp(cx, oneliners, artwork) + await app.run_async() diff --git a/userland/scripts/oneliners.tcss b/userland/scripts/oneliners.tcss new file mode 100644 index 0000000..1f49dab --- /dev/null +++ b/userland/scripts/oneliners.tcss @@ -0,0 +1,8 @@ +Label { + width: 100%; +} + +#err { + background: #a00; + color: #fff; +} diff --git a/userland/scripts/top.py b/userland/scripts/top.py index 2ceb224..d30fbab 100644 --- a/userland/scripts/top.py +++ b/userland/scripts/top.py @@ -1,11 +1,12 @@ """Userland entry point""" -# stdlib -from asyncio import sleep +# 3rd party +from rich.progress import track # api +from xthulu.ssh.console.art import scroll_art +from xthulu.ssh.console.input import wait_for_key from xthulu.ssh.context import SSHContext -from xthulu.ssh.ui import show_art async def main(cx: SSHContext): @@ -14,33 +15,26 @@ async def main(cx: SSHContext): elif cx.env["TERM"] != "ansi": cx.echo("\x1b%@\x1b(U") + await scroll_art(cx, "userland/artwork/login.ans", "amiga") + await wait_for_key(cx, "Press any key to continue", "arc") cx.echo( - cx.term.normal, - "\r\n", "💀 " if cx.encoding == "utf-8" else "", - cx.term.bright_green("x"), - cx.term.green("thulu"), - " terminal server ", - cx.term.italic("v1.0.0a0"), - "\r\n", - cx.term.bright_black("https://github.com/haliphax/xthulu"), - "\r\n\r\n", - cx.term.bright_white("Connecting: "), - cx.term.bright_cyan_underline(cx.user.name), - "@", - cx.term.cyan(cx.ip), - " ", + "[bold bright_green]x[/][green]thulu[/] ", + "terminal server [italic]v1.0.0a0[/]\n", + "[bright_black]https://github.com/haliphax/xthulu[/]\n\n", ) - for color in ("bright_black", "white", "bright_white"): - colorfunc = getattr(cx.term, color) - await sleep(0.5) - cx.echo(colorfunc(".")) + bar_text = "".join( + [ + "[bright_white]Connecting:[/] ", + f"[bright_cyan underline]{cx.user.name}[/]", + f"@[cyan]{cx.ip}[/]", + ] + ) - await sleep(0.5) - cx.echo(f"{cx.term.normal}\r\n") - await show_art(cx, "userland/artwork/login.ans") + for _ in track(sequence=range(20), description=bar_text, console=cx.term): + if await wait_for_key(cx, timeout=0.1): + break await cx.gosub("oneliners") await cx.gosub("lock_example") - await cx.gosub("editor_demo") diff --git a/xthulu/ssh/codecs/__init__.py b/xthulu/ssh/codecs/__init__.py index faf2b86..2047d15 100644 --- a/xthulu/ssh/codecs/__init__.py +++ b/xthulu/ssh/codecs/__init__.py @@ -4,13 +4,14 @@ from codecs import decode, register # local -from . import cp437 +from . import amiga, cp437 def register_encodings(): """Register encodings to be used by the system.""" _encodings = { + "amiga": amiga.getregentry(), "cp437": cp437.getregentry(), } @@ -22,5 +23,8 @@ def _search_function(encoding: str): register(_search_function) - for c in ("cp437",): + for c in ( + "amiga", + "cp437", + ): decode(bytes((27,)), c) diff --git a/xthulu/ssh/codecs/amiga.py b/xthulu/ssh/codecs/amiga.py new file mode 100644 index 0000000..c8fb931 --- /dev/null +++ b/xthulu/ssh/codecs/amiga.py @@ -0,0 +1,372 @@ +""" +"Amiga" (Topaz, etc.) font codec + +https://github.com/jquast/x84/blob/master/x84/encodings/amiga.py +""" + +import codecs + + +class Codec(codecs.Codec): + def encode(self, char, errors="strict"): + raise NotImplementedError() + + def decode(self, char, errors="strict"): + return codecs.charmap_decode( + char, errors, DECODING_TABLE # type: ignore + ) + + +class IncrementalEncoder(codecs.IncrementalEncoder): + def encode(self, char, final=False): + raise NotImplementedError() + + +class IncrementalDecoder(codecs.IncrementalDecoder): + def decode(self, char, final=False): + return codecs.charmap_decode( + char, self.errors, DECODING_TABLE # type: ignore + )[0] + + +class StreamWriter(Codec, codecs.StreamWriter): + pass + + +class StreamReader(Codec, codecs.StreamReader): + pass + + +def getaliases(): + return ( + "amiga", + "microknight", + "mosoul", + "p0tnoodle", + "topaz", + "topaz1", + "topaz1plus", + "topaz2", + "topaz2plus", + "topazplus", + ) + + +def getregentry(): + return codecs.CodecInfo( + name="amiga", + encode=Codec().encode, # type: ignore + decode=Codec().decode, # type: ignore + incrementalencoder=IncrementalEncoder, + incrementaldecoder=IncrementalDecoder, + streamreader=StreamReader, + streamwriter=StreamWriter, + ) + + +decoding_map = codecs.make_identity_dict(range(256)) # type: ignore +decoding_map.update( + { + 0x002D: 0x2500, # BOX DRAWINGS LIGHT HORIZONTAL + 0x002F: 0x2571, # BOX DRAWINGS LIGHT DIAGONAL UPPER RIGHT TO LOWER LEFT + 0x0058: 0x2573, # BOX DRAWINGS LIGHT DIAGONAL CROSS + 0x005C: 0x2572, # BOX DRAWINGS LIGHT DIAGONAL UPPER LEFT TO LOWER RIGHT + 0x005F: 0x2581, # LOWER ONE EIGHTH BLOCK + 0x007C: 0x2502, # BOX DRAWINGS LIGHT VERTICAL + 0x007F: 0x259E, # QUADRANT UPPER RIGHT AND LOWER LEFT + 0x0080: 0x2B1C, # WHITE LARGE SQUARE + 0x0081: 0x2B1C, # WHITE LARGE SQUARE + 0x0082: 0x2B1C, # WHITE LARGE SQUARE + 0x0083: 0x2B1C, # WHITE LARGE SQUARE + 0x0084: 0x2B1C, # WHITE LARGE SQUARE + 0x0085: 0x2B1C, # WHITE LARGE SQUARE + 0x0086: 0x2B1C, # WHITE LARGE SQUARE + 0x0087: 0x2B1C, # WHITE LARGE SQUARE + 0x0088: 0x2B1C, # WHITE LARGE SQUARE + 0x0089: 0x2B1C, # WHITE LARGE SQUARE + 0x008A: 0x2B1C, # WHITE LARGE SQUARE + 0x008B: 0x2B1C, # WHITE LARGE SQUARE + 0x008C: 0x2B1C, # WHITE LARGE SQUARE + 0x008D: 0x2B1C, # WHITE LARGE SQUARE + 0x008E: 0x2B1C, # WHITE LARGE SQUARE + 0x008F: 0x2B1C, # WHITE LARGE SQUARE + 0x0090: 0x2B1C, # WHITE LARGE SQUARE + 0x0091: 0x2B1C, # WHITE LARGE SQUARE + 0x0092: 0x2B1C, # WHITE LARGE SQUARE + 0x0093: 0x2B1C, # WHITE LARGE SQUARE + 0x0094: 0x2B1C, # WHITE LARGE SQUARE + 0x0095: 0x2B1C, # WHITE LARGE SQUARE + 0x0096: 0x2B1C, # WHITE LARGE SQUARE + 0x0097: 0x2B1C, # WHITE LARGE SQUARE + 0x0098: 0x2B1C, # WHITE LARGE SQUARE + 0x0099: 0x2B1C, # WHITE LARGE SQUARE + 0x009A: 0x2B1C, # WHITE LARGE SQUARE + 0x009B: 0x2B1C, # WHITE LARGE SQUARE + 0x009C: 0x2B1C, # WHITE LARGE SQUARE + 0x009D: 0x2B1C, # WHITE LARGE SQUARE + 0x009E: 0x2B1C, # WHITE LARGE SQUARE + 0x009F: 0x2B1C, # WHITE LARGE SQUARE + 0x00AF: 0x2594, # UPPER ONE EIGHTH BLOCK + } +) + +DECODING_TABLE = ( + "\x00" # 0x0000 -> NULL + "\x01" # 0x0001 -> START OF HEADING + "\x02" # 0x0002 -> START OF TEXT + "\x03" # 0x0003 -> END OF TEXT + "\x04" # 0x0004 -> END OF TRANSMISSION + "\x05" # 0x0005 -> ENQUIRY + "\x06" # 0x0006 -> ACKNOWLEDGE + "\x07" # 0x0007 -> BELL + "\x08" # 0x0008 -> BACKSPACE + "\t" # 0x0009 -> HORIZONTAL TABULATION + "\n" # 0x000a -> LINE FEED + "\x0b" # 0x000b -> VERTICAL TABULATION + "\x0c" # 0x000c -> FORM FEED + "\r" # 0x000d -> CARRIAGE RETURN + "\x0e" # 0x000e -> SHIFT OUT + "\x0f" # 0x000f -> SHIFT IN + "\x10" # 0x0010 -> DATA LINK ESCAPE + "\x11" # 0x0011 -> DEVICE CONTROL ONE + "\x12" # 0x0012 -> DEVICE CONTROL TWO + "\x13" # 0x0013 -> DEVICE CONTROL THREE + "\x14" # 0x0014 -> DEVICE CONTROL FOUR + "\x15" # 0x0015 -> NEGATIVE ACKNOWLEDGE + "\x16" # 0x0016 -> SYNCHRONOUS IDLE + "\x17" # 0x0017 -> END OF TRANSMISSION BLOCK + "\x18" # 0x0018 -> CANCEL + "\x19" # 0x0019 -> END OF MEDIUM + "\x1a" # 0x001a -> SUBSTITUTE + "\x1b" # 0x001b -> ESCAPE + "\x1c" # 0x001c -> FILE SEPARATOR + "\x1d" # 0x001d -> GROUP SEPARATOR + "\x1e" # 0x001e -> RECORD SEPARATOR + "\x1f" # 0x001f -> UNIT SEPARATOR + " " # 0x0020 -> SPACE + "!" # 0x0021 -> EXCLAMATION MARK + '"' # 0x0022 -> QUOTATION MARK + "#" # 0x0023 -> NUMBER SIGN + "$" # 0x0024 -> DOLLAR SIGN + "%" # 0x0025 -> PERCENT SIGN + "&" # 0x0026 -> AMPERSAND + "'" # 0x0027 -> APOSTROPHE + "(" # 0x0028 -> LEFT PARENTHESIS + ")" # 0x0029 -> RIGHT PARENTHESIS + "*" # 0x002a -> ASTERISK + "+" # 0x002b -> PLUS SIGN + "," # 0x002c -> COMMA + "\u2500" # 0x002d -> BOX DRAWINGS LIGHT HORIZONTAL + "." # 0x002e -> FULL STOP + # 0x002f -> BOX DRAWINGS LIGHT DIAGONAL UPPER RIGHT TO LOWER LEFT + "\u2571" + "0" # 0x0030 -> DIGIT ZERO + "1" # 0x0031 -> DIGIT ONE + "2" # 0x0032 -> DIGIT TWO + "3" # 0x0033 -> DIGIT THREE + "4" # 0x0034 -> DIGIT FOUR + "5" # 0x0035 -> DIGIT FIVE + "6" # 0x0036 -> DIGIT SIX + "7" # 0x0037 -> DIGIT SEVEN + "8" # 0x0038 -> DIGIT EIGHT + "9" # 0x0039 -> DIGIT NINE + ":" # 0x003a -> COLON + ";" # 0x003b -> SEMICOLON + "<" # 0x003c -> LESS-THAN SIGN + "=" # 0x003d -> EQUALS SIGN + ">" # 0x003e -> GREATER-THAN SIGN + "?" # 0x003f -> QUESTION MARK + "@" # 0x0040 -> COMMERCIAL AT + "A" # 0x0041 -> LATIN CAPITAL LETTER A + "B" # 0x0042 -> LATIN CAPITAL LETTER B + "C" # 0x0043 -> LATIN CAPITAL LETTER C + "D" # 0x0044 -> LATIN CAPITAL LETTER D + "E" # 0x0045 -> LATIN CAPITAL LETTER E + "F" # 0x0046 -> LATIN CAPITAL LETTER F + "G" # 0x0047 -> LATIN CAPITAL LETTER G + "H" # 0x0048 -> LATIN CAPITAL LETTER H + "I" # 0x0049 -> LATIN CAPITAL LETTER I + "J" # 0x004a -> LATIN CAPITAL LETTER J + "K" # 0x004b -> LATIN CAPITAL LETTER K + "L" # 0x004c -> LATIN CAPITAL LETTER L + "M" # 0x004d -> LATIN CAPITAL LETTER M + "N" # 0x004e -> LATIN CAPITAL LETTER N + "O" # 0x004f -> LATIN CAPITAL LETTER O + "P" # 0x0050 -> LATIN CAPITAL LETTER P + "Q" # 0x0051 -> LATIN CAPITAL LETTER Q + "R" # 0x0052 -> LATIN CAPITAL LETTER R + "S" # 0x0053 -> LATIN CAPITAL LETTER S + "T" # 0x0054 -> LATIN CAPITAL LETTER T + "U" # 0x0055 -> LATIN CAPITAL LETTER U + "V" # 0x0056 -> LATIN CAPITAL LETTER V + "W" # 0x0057 -> LATIN CAPITAL LETTER W + "\u2573" # 0x0058 -> BOX DRAWINGS LIGHT DIAGONAL CROSS + "Y" # 0x0059 -> LATIN CAPITAL LETTER Y + "Z" # 0x005a -> LATIN CAPITAL LETTER Z + "[" # 0x005b -> LEFT SQUARE BRACKET + # 0x005c -> BOX DRAWINGS LIGHT DIAGONAL UPPER LEFT TO LOWER RIGHT + "\u2572" + "]" # 0x005d -> RIGHT SQUARE BRACKET + "^" # 0x005e -> CIRCUMFLEX ACCENT + "\u2581" # 0x005f -> LOWER ONE EIGHTH BLOCK + "`" # 0x0060 -> GRAVE ACCENT + "a" # 0x0061 -> LATIN SMALL LETTER A + "b" # 0x0062 -> LATIN SMALL LETTER B + "c" # 0x0063 -> LATIN SMALL LETTER C + "d" # 0x0064 -> LATIN SMALL LETTER D + "e" # 0x0065 -> LATIN SMALL LETTER E + "f" # 0x0066 -> LATIN SMALL LETTER F + "g" # 0x0067 -> LATIN SMALL LETTER G + "h" # 0x0068 -> LATIN SMALL LETTER H + "i" # 0x0069 -> LATIN SMALL LETTER I + "j" # 0x006a -> LATIN SMALL LETTER J + "k" # 0x006b -> LATIN SMALL LETTER K + "l" # 0x006c -> LATIN SMALL LETTER L + "m" # 0x006d -> LATIN SMALL LETTER M + "n" # 0x006e -> LATIN SMALL LETTER N + "o" # 0x006f -> LATIN SMALL LETTER O + "p" # 0x0070 -> LATIN SMALL LETTER P + "q" # 0x0071 -> LATIN SMALL LETTER Q + "r" # 0x0072 -> LATIN SMALL LETTER R + "s" # 0x0073 -> LATIN SMALL LETTER S + "t" # 0x0074 -> LATIN SMALL LETTER T + "u" # 0x0075 -> LATIN SMALL LETTER U + "v" # 0x0076 -> LATIN SMALL LETTER V + "w" # 0x0077 -> LATIN SMALL LETTER W + "x" # 0x0078 -> LATIN SMALL LETTER X + "y" # 0x0079 -> LATIN SMALL LETTER Y + "z" # 0x007a -> LATIN SMALL LETTER Z + "{" # 0x007b -> LEFT CURLY BRACKET + "\u2502" # 0x007c -> BOX DRAWINGS LIGHT VERTICAL + "}" # 0x007d -> RIGHT CURLY BRACKET + "~" # 0x007e -> TILDE + "\u259e" # 0x007f -> QUADRANT UPPER RIGHT AND LOWER LEFT + "\u2b1c" # 0x0080 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0081 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0082 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0083 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0084 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0085 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0086 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0087 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0088 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0089 -> WHITE LARGE SQUARE + "\u2b1c" # 0x008a -> WHITE LARGE SQUARE + "\u2b1c" # 0x008b -> WHITE LARGE SQUARE + "\u2b1c" # 0x008c -> WHITE LARGE SQUARE + "\u2b1c" # 0x008d -> WHITE LARGE SQUARE + "\u2b1c" # 0x008e -> WHITE LARGE SQUARE + "\u2b1c" # 0x008f -> WHITE LARGE SQUARE + "\u2b1c" # 0x0090 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0091 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0092 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0093 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0094 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0095 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0096 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0097 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0098 -> WHITE LARGE SQUARE + "\u2b1c" # 0x0099 -> WHITE LARGE SQUARE + "\u2b1c" # 0x009a -> WHITE LARGE SQUARE + "\u2b1c" # 0x009b -> WHITE LARGE SQUARE + "\u2b1c" # 0x009c -> WHITE LARGE SQUARE + "\u2b1c" # 0x009d -> WHITE LARGE SQUARE + "\u2b1c" # 0x009e -> WHITE LARGE SQUARE + "\u2b1c" # 0x009f -> WHITE LARGE SQUARE + "\xa0" # 0x00a0 -> NO-BREAK SPACE + "\xa1" # 0x00a1 -> INVERTED EXCLAMATION MARK + "\xa2" # 0x00a2 -> CENT SIGN + "\xa3" # 0x00a3 -> POUND SIGN + "\xa4" # 0x00a4 -> CURRENCY SIGN + "\xa5" # 0x00a5 -> YEN SIGN + "\xa6" # 0x00a6 -> BROKEN BAR + "\xa7" # 0x00a7 -> SECTION SIGN + "\xa8" # 0x00a8 -> DIAERESIS + "\xa9" # 0x00a9 -> COPYRIGHT SIGN + "\xaa" # 0x00aa -> FEMININE ORDINAL INDICATOR + "\xab" # 0x00ab -> LEFT-POINTING DOUBLE ANGLE QUOTATION MARK + "\xac" # 0x00ac -> NOT SIGN + "\xad" # 0x00ad -> SOFT HYPHEN + "\xae" # 0x00ae -> REGISTERED SIGN + "\xaf" # 0x00af -> MACRON + "\xb0" # 0x00b0 -> DEGREE SIGN + "\xb1" # 0x00b1 -> PLUS-MINUS SIGN + "\xb2" # 0x00b2 -> SUPERSCRIPT TWO + "\xb3" # 0x00b3 -> SUPERSCRIPT THREE + "\xb4" # 0x00b4 -> ACUTE ACCENT + "\xb5" # 0x00b5 -> MICRO SIGN + "\xb6" # 0x00b6 -> PILCROW SIGN + "\xb7" # 0x00b7 -> MIDDLE DOT + "\xb8" # 0x00b8 -> CEDILLA + "\xb9" # 0x00b9 -> SUPERSCRIPT ONE + "\xba" # 0x00ba -> MASCULINE ORDINAL INDICATOR + "\xbb" # 0x00bb -> RIGHT-POINTING DOUBLE ANGLE QUOTATION MARK + "\xbc" # 0x00bc -> VULGAR FRACTION ONE QUARTER + "\xbd" # 0x00bd -> VULGAR FRACTION ONE HALF + "\xbe" # 0x00be -> VULGAR FRACTION THREE QUARTERS + "\xbf" # 0x00bf -> INVERTED QUESTION MARK + "\xc0" # 0x00c0 -> LATIN CAPITAL LETTER A WITH GRAVE + "\xc1" # 0x00c1 -> LATIN CAPITAL LETTER A WITH ACUTE + "\xc2" # 0x00c2 -> LATIN CAPITAL LETTER A WITH CIRCUMFLEX + "\xc3" # 0x00c3 -> LATIN CAPITAL LETTER A WITH TILDE + "\xc4" # 0x00c4 -> LATIN CAPITAL LETTER A WITH DIAERESIS + "\xc5" # 0x00c5 -> LATIN CAPITAL LETTER A WITH RING ABOVE + "\xc6" # 0x00c6 -> LATIN CAPITAL LETTER AE + "\xc7" # 0x00c7 -> LATIN CAPITAL LETTER C WITH CEDILLA + "\xc8" # 0x00c8 -> LATIN CAPITAL LETTER E WITH GRAVE + "\xc9" # 0x00c9 -> LATIN CAPITAL LETTER E WITH ACUTE + "\xca" # 0x00ca -> LATIN CAPITAL LETTER E WITH CIRCUMFLEX + "\xcb" # 0x00cb -> LATIN CAPITAL LETTER E WITH DIAERESIS + "\xcc" # 0x00cc -> LATIN CAPITAL LETTER I WITH GRAVE + "\xcd" # 0x00cd -> LATIN CAPITAL LETTER I WITH ACUTE + "\xce" # 0x00ce -> LATIN CAPITAL LETTER I WITH CIRCUMFLEX + "\xcf" # 0x00cf -> LATIN CAPITAL LETTER I WITH DIAERESIS + "\xd0" # 0x00d0 -> LATIN CAPITAL LETTER ETH + "\xd1" # 0x00d1 -> LATIN CAPITAL LETTER N WITH TILDE + "\xd2" # 0x00d2 -> LATIN CAPITAL LETTER O WITH GRAVE + "\xd3" # 0x00d3 -> LATIN CAPITAL LETTER O WITH ACUTE + "\xd4" # 0x00d4 -> LATIN CAPITAL LETTER O WITH CIRCUMFLEX + "\xd5" # 0x00d5 -> LATIN CAPITAL LETTER O WITH TILDE + "\xd6" # 0x00d6 -> LATIN CAPITAL LETTER O WITH DIAERESIS + "\xd7" # 0x00d7 -> MULTIPLICATION SIGN + "\xd8" # 0x00d8 -> LATIN CAPITAL LETTER O WITH STROKE + "\xd9" # 0x00d9 -> LATIN CAPITAL LETTER U WITH GRAVE + "\xda" # 0x00da -> LATIN CAPITAL LETTER U WITH ACUTE + "\xdb" # 0x00db -> LATIN CAPITAL LETTER U WITH CIRCUMFLEX + "\xdc" # 0x00dc -> LATIN CAPITAL LETTER U WITH DIAERESIS + "\xdd" # 0x00dd -> LATIN CAPITAL LETTER Y WITH ACUTE + "\xde" # 0x00de -> LATIN CAPITAL LETTER THORN + "\xdf" # 0x00df -> LATIN SMALL LETTER SHARP S + "\xe0" # 0x00e0 -> LATIN SMALL LETTER A WITH GRAVE + "\xe1" # 0x00e1 -> LATIN SMALL LETTER A WITH ACUTE + "\xe2" # 0x00e2 -> LATIN SMALL LETTER A WITH CIRCUMFLEX + "\xe3" # 0x00e3 -> LATIN SMALL LETTER A WITH TILDE + "\xe4" # 0x00e4 -> LATIN SMALL LETTER A WITH DIAERESIS + "\xe5" # 0x00e5 -> LATIN SMALL LETTER A WITH RING ABOVE + "\xe6" # 0x00e6 -> LATIN SMALL LETTER AE + "\xe7" # 0x00e7 -> LATIN SMALL LETTER C WITH CEDILLA + "\xe8" # 0x00e8 -> LATIN SMALL LETTER E WITH GRAVE + "\xe9" # 0x00e9 -> LATIN SMALL LETTER E WITH ACUTE + "\xea" # 0x00ea -> LATIN SMALL LETTER E WITH CIRCUMFLEX + "\xeb" # 0x00eb -> LATIN SMALL LETTER E WITH DIAERESIS + "\xec" # 0x00ec -> LATIN SMALL LETTER I WITH GRAVE + "\xed" # 0x00ed -> LATIN SMALL LETTER I WITH ACUTE + "\xee" # 0x00ee -> LATIN SMALL LETTER I WITH CIRCUMFLEX + "\xef" # 0x00ef -> LATIN SMALL LETTER I WITH DIAERESIS + "\xf0" # 0x00f0 -> LATIN SMALL LETTER ETH + "\xf1" # 0x00f1 -> LATIN SMALL LETTER N WITH TILDE + "\xf2" # 0x00f2 -> LATIN SMALL LETTER O WITH GRAVE + "\xf3" # 0x00f3 -> LATIN SMALL LETTER O WITH ACUTE + "\xf4" # 0x00f4 -> LATIN SMALL LETTER O WITH CIRCUMFLEX + "\xf5" # 0x00f5 -> LATIN SMALL LETTER O WITH TILDE + "\xf6" # 0x00f6 -> LATIN SMALL LETTER O WITH DIAERESIS + "\xf7" # 0x00f7 -> DIVISION SIGN + "\xf8" # 0x00f8 -> LATIN SMALL LETTER O WITH STROKE + "\xf9" # 0x00f9 -> LATIN SMALL LETTER U WITH GRAVE + "\xfa" # 0x00fa -> LATIN SMALL LETTER U WITH ACUTE + "\xfb" # 0x00fb -> LATIN SMALL LETTER U WITH CIRCUMFLEX + "\xfc" # 0x00fc -> LATIN SMALL LETTER U WITH DIAERESIS + "\xfd" # 0x00fd -> LATIN SMALL LETTER Y WITH ACUTE + "\xfe" # 0x00fe -> LATIN SMALL LETTER THORN + "\xff" # 0x00ff -> LATIN SMALL LETTER Y WITH DIAERESIS +) diff --git a/xthulu/ssh/console/__init__.py b/xthulu/ssh/console/__init__.py new file mode 100644 index 0000000..4b63aac --- /dev/null +++ b/xthulu/ssh/console/__init__.py @@ -0,0 +1,43 @@ +# typing +from typing import Any, Mapping + +# 3rd party +from asyncssh import SSHWriter +from rich.console import Console + +# local +from .file_wrapper import FileWrapper + + +class XthuluConsole(Console): + _encoding: str + + def __init__( + self, + *, + encoding: str, + height: int | None = None, + ssh_writer: SSHWriter[Any], + width: int | None = None, + _environ: Mapping[str, str] | None = None, + **kwargs, + ): + self.encoding = encoding + super().__init__( + **kwargs, + file=FileWrapper(ssh_writer, encoding), + force_interactive=True, + force_terminal=True, + highlight=False, + width=width, + height=height, + _environ=_environ, + ) + + @property + def encoding(self): + return self._encoding + + @encoding.setter + def encoding(self, val: str): + self._encoding = val diff --git a/xthulu/ssh/console/app.py b/xthulu/ssh/console/app.py new file mode 100644 index 0000000..5c98cd5 --- /dev/null +++ b/xthulu/ssh/console/app.py @@ -0,0 +1,53 @@ +# stdlib +from asyncio import sleep + +# 3rd party +from textual import events +from textual.app import App +from textual.geometry import Size + +from ...events.structs import EventData + +# local +from ..context import SSHContext + + +class XthuluApp(App): + + """SSH wrapper for Textual apps""" + + context: SSHContext + """The current SSH context""" + + def __init__(self, context: SSHContext, **kwargs): + # avoid cyclic import + from .driver import SSHDriver + + self.context = context + super().__init__(driver_class=SSHDriver, **kwargs) + self.console = context.term + self.error_console = None + self.run_worker(self._watch_for_resize, exclusive=True) + + async def _watch_for_resize(self): + while True: + ev: list[EventData] = self.context.events.get( + "resize" + ) # type: ignore + + if not ev: + await sleep(0.5) + continue + + new_size = Size(*ev[-1].data) + self._driver.process_event(events.Resize(new_size, new_size)) + + def exit(self, **kwargs) -> None: + # avoid cyclic import + from .driver import SSHDriver + + super().exit(**kwargs) + self._driver: SSHDriver + self._driver._disable_bracketed_paste() + self._driver._disable_mouse_support() + self._driver.exit_event.set() diff --git a/xthulu/ssh/console/art.py b/xthulu/ssh/console/art.py new file mode 100644 index 0000000..a012fa6 --- /dev/null +++ b/xthulu/ssh/console/art.py @@ -0,0 +1,124 @@ +# stdlib +from asyncio import QueueEmpty, sleep +from re import Match, sub + +# 3rd party +import aiofiles as aiof +from textual import events +from textual.widgets import Log + +# local +from ..context import SSHContext +from .app import XthuluApp + +FIND_CUF_REGEX = r"\x1b\[(\d+)C" + + +class ArtLog(XthuluApp): + """Displays artwork""" + + artwork: list[str] + delay: float + + def __init__( + self, + context: SSHContext, + artwork: list[str], + delay: float = 0.1, + **kwargs, + ): + self.artwork = artwork + self.delay = delay + super().__init__(context, **kwargs) + self.run_worker(self._worker, exclusive=True) + + @property + def scrollbars_enabled(self) -> bool: + return False + + def compose(self): + yield Log() + + async def _worker(self): + artlog: Log = self.query_one(Log) + + for line in self.artwork: + if self._exit: + return + + artlog.write_line(line) + + try: + self.context.input.get_nowait() + break + except QueueEmpty: + await sleep(0.1) + else: + await sleep(1) + + self.exit() + + async def on_key(self, _: events.Key): + self.exit() + + +def _replace_cuf(match: Match[str]): + return " " * int(match.group(1)) + + +def normalize_ansi(text: str): + """Replace CUF sequences with spaces.""" + + return sub(FIND_CUF_REGEX, _replace_cuf, text) + + +async def load_art(path: str, encoding="cp437"): + """Load normalized, properly-encoded artwork files.""" + + async with aiof.open(path, encoding=encoding) as f: + artwork = [normalize_ansi(line) for line in await f.readlines()] + + return artwork + + +async def scroll_art_app( + context: SSHContext, path: str, encoding="cp437", delay=0.1 +): + """Display ANSI artwork in a scrolling Log panel.""" + + artwork = await load_art(path, encoding) + await ArtLog(context, artwork, delay).run_async() + + +async def scroll_art( + context: SSHContext, path: str, encoding="cp437", delay=0.1 +): + """Display ANSI artwork directly to the console.""" + + artwork = await load_art(path, encoding) + + # show entire piece immediately if shorter than terminal + if context.term.height >= len(artwork): + context.term.out(*artwork) + return + + for line in artwork: + context.term.out( + line, + end="", + highlight=False, + ) + + try: + context.input.get_nowait() + break + except QueueEmpty: + await sleep(delay) + else: + await sleep(1) + + +async def show_art(context: SSHContext, path: str, encoding="cp437"): + """Display ANSI artwork directly to the console without scrolling.""" + + context.term.out(*(await load_art(path, encoding))) diff --git a/xthulu/ssh/console/driver.py b/xthulu/ssh/console/driver.py new file mode 100644 index 0000000..c9c963d --- /dev/null +++ b/xthulu/ssh/console/driver.py @@ -0,0 +1,61 @@ +# 3rd party +from asyncio import run_coroutine_threadsafe +from codecs import getincrementaldecoder +from textual._parser import ParseError +from textual._xterm_parser import XTermParser +from textual.drivers.linux_driver import LinuxDriver + +# local +from ..context import SSHContext +from .app import XthuluApp + + +class SSHDriver(LinuxDriver): + context: SSHContext + + def __init__(self, app: XthuluApp, **kwargs) -> None: + self.context = app.context + + if "size" in kwargs: + del kwargs["size"] + + super().__init__( + app, + size=self._get_terminal_size(), + **kwargs, + ) + + def _get_terminal_size(self) -> tuple[int, int]: + return (self.context.term.width, self.context.term.height) + + def flush(self) -> None: + pass + + def run_input_thread(self) -> None: + parser = XTermParser(self.context.proc.stdin.at_eof, self._debug) + feed = parser.feed + decode = getincrementaldecoder("utf-8")().decode + + while not self.exit_event.is_set(): + try: + r = run_coroutine_threadsafe( + self.context.input.get(), self._loop + ).result() + unicode_data = decode(r) + + for event in feed(unicode_data): + self.process_event(event) + + except ParseError: + # process is likely closing; end the loop + return + + # avoid input debuffering bug on close + self.context.proc.stdin.feed_data(b"\r\n") + + def write(self, data: str) -> None: + try: + self.context.proc.stdout.write(data.encode(self.context.encoding)) + except BrokenPipeError: + # process is likely closing + pass diff --git a/xthulu/ssh/console/file_wrapper.py b/xthulu/ssh/console/file_wrapper.py new file mode 100644 index 0000000..b81124c --- /dev/null +++ b/xthulu/ssh/console/file_wrapper.py @@ -0,0 +1,24 @@ +# typing +from typing import Any, IO + +# 3rd party +from asyncssh import SSHWriter + + +class FileWrapper(IO[str]): + _encoding: str + _wrapped: SSHWriter[Any] + + def __init__(self, wrapped: SSHWriter[Any], encoding: str): + self._encoding = encoding + self._wrapped = wrapped + super().__init__() + + def write(self, string: str): + try: + self._wrapped.write( + string.replace("\n", "\r\n").encode(self._encoding) + ) + except BrokenPipeError: + # process is likely closing + pass diff --git a/xthulu/ssh/console/input.py b/xthulu/ssh/console/input.py new file mode 100644 index 0000000..5dbeb40 --- /dev/null +++ b/xthulu/ssh/console/input.py @@ -0,0 +1,50 @@ +# stdlib +from asyncio import QueueEmpty, sleep, wait_for + +# local +from ..context import SSHContext + + +async def wait_for_key(cx: SSHContext, text="", spinner="dots", timeout=0.0): + """Wait for (and return) a keypress.""" + + async def _wait(): + seq = [] + + while True: + try: + key = cx.input.get_nowait() + seq.append(key) + + # wait for next char in ESC sequence (arrow keys, etc.) + if key == b"\x1b": + try: + key = await wait_for(cx.input.get(), 0.2) + seq.append(key) + except TimeoutError: + pass + + break + else: + break + + except QueueEmpty: + await sleep(0.01) + + return b"".join(seq) + + try: + if text == "": + if timeout > 0.0: + return await wait_for(_wait(), timeout) + + return await _wait() + + with cx.term.status(text, spinner=spinner): + if timeout > 0.0: + return await wait_for(_wait(), timeout) + + return await _wait() + + except TimeoutError: + pass diff --git a/xthulu/ssh/context/__init__.py b/xthulu/ssh/context/__init__.py index a7d70a5..b84d2b9 100644 --- a/xthulu/ssh/context/__init__.py +++ b/xthulu/ssh/context/__init__.py @@ -5,7 +5,7 @@ from types import ModuleType # stdlib -from asyncio import sleep +from asyncio import Queue, sleep from codecs import decode from functools import partial, singledispatch from importlib.abc import Loader @@ -24,9 +24,9 @@ from ...events import EventQueue from ...logger import log from ...models import User +from ..console import XthuluConsole from ..exceptions import Goto, ProcessClosing from ..structs import Script -from ..terminal.proxy_terminal import ProxyTerminal from .lock_manager import _LockManager from .log_filter import ContextLogFilter @@ -41,10 +41,13 @@ class SSHContext: proc: SSHServerProcess """This context's containing process""" + input: Queue + """This context's input queue""" + stack: list[Script] = [] """Script stack""" - term: ProxyTerminal + term: XthuluConsole """Context terminal object""" encoding: str @@ -96,6 +99,7 @@ async def create(cls, proc: SSHServerProcess, encoding="utf-8"): self.ip = self._peername[0] self.whoami = f"{self.username}@{self.ip}" self.proc = proc + self.input = Queue(1024) self.encoding = encoding self.log = logging.getLogger(self.sid) self.events = EventQueue(self.sid) @@ -143,7 +147,10 @@ def echo(self, *args: str, encoding: Optional[str] = None): ) text.append(encoded) - self.proc.stdout.write("".join(text).encode(self.encoding)) + if encoding is not None: + self.proc.stdout.write("".join(text).encode(self.encoding)) + else: + self.term.print("".join(text), sep="", end="") async def gosub(self, script: str, *args, **kwargs) -> Any: """ @@ -284,12 +291,8 @@ async def runscript(self, script: Script) -> Any: raise except Exception: message = f"Exception in script {script.name}" - self.log.exception(message) - self.echo( - self.term.normal, - "\r\n", - self.term.bright_white_on_red(f" {message} "), - self.term.normal, - "\r\n", - ) + log.exception(message, extra={"user": self.username}) + self.echo(f"\n\n[bright_white on red] {message} [/]\n\n") await sleep(3) + # avoid input debuffering bug if a Textual app crashed + self.proc.stdin.feed_data(b"\x1b") diff --git a/xthulu/ssh/process_factory.py b/xthulu/ssh/process_factory.py index 4a0d422..6fce29e 100644 --- a/xthulu/ssh/process_factory.py +++ b/xthulu/ssh/process_factory.py @@ -1,10 +1,8 @@ """SSH server process factory""" # stdlib -from asyncio import gather, IncompleteReadError, Queue, TimeoutError, wait_for +from asyncio import gather, IncompleteReadError, wait_for from datetime import datetime -from multiprocessing.connection import Pipe -from multiprocessing import Process # 3rd party from asyncssh import SSHServerProcess, TerminalSizeChanged @@ -13,11 +11,10 @@ from ..configuration import get_config from ..events.structs import EventData from ..logger import log +from .console import XthuluConsole from .context import SSHContext from .exceptions import Goto, ProcessClosing from .structs import Script -from .terminal import terminal_process -from .terminal.proxy_terminal import ProxyTerminal async def handle_client(proc: SSHServerProcess): @@ -54,70 +51,58 @@ async def handle_client(proc: SSHServerProcess): if "TERM" not in cx.env: cx.env["TERM"] = termtype - w, h, pw, ph = proc.get_terminal_size() + w, h, _, _ = proc.get_terminal_size() cx.env["COLUMNS"] = str(w) cx.env["LINES"] = str(h) - proxy_pipe, subproc_pipe = Pipe() - session_stdin = Queue() - timeout = int(get_config("ssh.session.timeout", 120)) await cx.user.update(last=datetime.utcnow()).apply() # type: ignore async def input_loop(): - """Catch exceptions on stdin and convert to EventData.""" + timeout = int(get_config("ssh.session.timeout", 120)) - while True: + while not proc.is_closing(): try: if timeout > 0: - r = await wait_for(proc.stdin.readexactly(1), timeout) + r = await wait_for(proc.stdin.read(1024), timeout) else: - r = await proc.stdin.readexactly(1) + r = await proc.stdin.read(1024) - await session_stdin.put(r) + await cx.input.put(r) except IncompleteReadError: - # process is likely closing; end the loop - return + # process is likely closing + break except TimeoutError: log.warning(f"{cx.whoami} timed out") - cx.echo( - cx.term.normal, - "\r\n", - cx.term.bright_white_on_red(" TIMED OUT "), - cx.term.normal, - "\r\n", - ) - proc.channel.close() - proc.close() - - return + cx.echo("\n\n[bright_white on red] TIMED OUT [/]\n\n") + break except TerminalSizeChanged as sz: cx.env["COLUMNS"] = str(sz.width) cx.env["LINES"] = str(sz.height) - cx.term._width = sz.width - cx.term._height = sz.height - cx.term._pixel_width = sz.pixwidth - cx.term._pixel_height = sz.pixheight + cx.term.width = sz.width + cx.term.height = sz.height cx.events.add(EventData("resize", (sz.width, sz.height))) + # disable capture of mouse events + cx.echo("\x1b[?1000l\x1b[?1003l\x1b[?1015l\x1b[?1006l") + # show cursor + cx.echo("\x1b[?25h") + + if proc.channel: + proc.channel.close() + + proc.close() + async def main_process(): """Userland script stack; main process.""" - tp = Process( - target=terminal_process, - args=(termtype, w, h, pw, ph, subproc_pipe), - ) - tp.start() - cx.term = ProxyTerminal( - session_stdin, - proc.stdout, - cx.encoding, - proxy_pipe, - w, - h, - pw, - ph, + cx.term = XthuluConsole( + encoding=cx.encoding, + height=h, + ssh_writer=proc.stdout, + width=w, + _environ=cx.env, ) # prep script stack with top scripts; # since we're treating it as a stack and not a queue, add them @@ -126,22 +111,20 @@ async def main_process(): cx.stack = [Script(s, (), {}) for s in reversed(top_names)] # main script engine loop - try: - while len(cx.stack): - try: - await cx.runscript(cx.stack.pop()) - except Goto as goto_script: - cx.stack = [goto_script.value] - except ProcessClosing: - cx.stack = [] - finally: - # send sentinel to close child 'term_pipe' process - proxy_pipe.send((None, (), {})) - - if proc.channel: - proc.channel.close() - - proc.close() + while len(cx.stack): + current = cx.stack.pop() + + try: + await cx.runscript(current) + except Goto as goto_script: + cx.stack = [goto_script.value] + except ProcessClosing: + cx.stack = [] + + if proc.channel: + proc.channel.close() + + proc.close() log.info(f"{cx.whoami} Starting terminal session") diff --git a/xthulu/ssh/terminal/__init__.py b/xthulu/ssh/terminal/__init__.py deleted file mode 100644 index e7747cd..0000000 --- a/xthulu/ssh/terminal/__init__.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -Asyncio blessed.Terminal implementation - -Significant rewrites for blessed functionality thanks to -https://github.com/jquast -""" - -# stdlib -from multiprocessing.connection import Connection -import os - -# local -from ...configuration import get_config -from ...logger import log -from .subprocess import SubprocessTerminal - -debug_term = bool(get_config("debug.term", False)) -"""Whether terminal debugging is enabled""" - - -def terminal_process( - termtype: str, w: int, h: int, pw: int, ph: int, subproc_pipe: Connection -): - """ - Avoid Python curses singleton bug by stuffing Terminal in a subprocess - and proxying calls/responses via Pipe. - - Args: - termtype: The terminal type. - w: The width in columns. - h: The height in rows. - pw: The width in pixels. - ph: The height in pixels. - subproc_pipe: The subprocess pipe for communication. - """ - - subproc_term = SubprocessTerminal(termtype, w, h, pw, ph) - - while True: - try: - given_attr, args, kwargs = subproc_pipe.recv() - except KeyboardInterrupt: - return - - if debug_term: - log.debug(f"proxy received: {given_attr}, {args!r}, " f"{kwargs!r}") - - # exit sentinel - if given_attr is None: - if debug_term: - log.debug(f"term={subproc_term}/pid={os.getpid()} exit") - - break - - # special attribute -- a context manager, enter it naturally, exit - # unnaturally (even, prematurely), with the exit value ready for - # our client side, this is only possible because blessed doesn't - # use any state or time-sensitive values, only terminal sequences, - # and these CM's are the only ones without side-effects. - if given_attr.startswith("!CTX"): - # here, we feel the real punishment of side-effects... - sideeffect_stream = subproc_term.stream.getvalue() # type: ignore - assert not sideeffect_stream, ("should be empty", sideeffect_stream) - - given_attr = given_attr[len("!CTX") :] - - if debug_term: - log.debug(f"context attr: {given_attr}") - - with getattr(subproc_term, given_attr)( - *args, **kwargs - ) as enter_result: - enter_side_effect = ( - subproc_term.stream.getvalue() # type: ignore - ) - subproc_term.stream.truncate(0) - subproc_term.stream.seek(0) - - if debug_term: - log.debug( - "enter_result, enter_side_effect = " - f"{enter_result!r}, {enter_side_effect!r}" - ) - - subproc_pipe.send((enter_result, enter_side_effect)) - - exit_side_effect = subproc_term.stream.getvalue() # type: ignore - subproc_term.stream.truncate(0) - subproc_term.stream.seek(0) - subproc_pipe.send(exit_side_effect) - - elif given_attr.startswith("!CALL"): - given_attr = given_attr[len("!CALL") :] - matching_attr = getattr(subproc_term, given_attr) - - if debug_term: - log.debug(f"callable attr: {given_attr}") - - subproc_pipe.send(matching_attr(*args, **kwargs)) - - else: - if debug_term: - log.debug(f"attr: {given_attr}") - - assert len(args) == len(kwargs) == 0, (args, kwargs) - matching_attr = getattr(subproc_term, given_attr) - - if debug_term: - log.debug(f"value: {matching_attr!r}") - - subproc_pipe.send(matching_attr) diff --git a/xthulu/ssh/terminal/constants.py b/xthulu/ssh/terminal/constants.py deleted file mode 100644 index 524ed9d..0000000 --- a/xthulu/ssh/terminal/constants.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Terminal/keystroke constants""" - -CTRL_C = chr(0x03) diff --git a/xthulu/ssh/terminal/proxy_call.py b/xthulu/ssh/terminal/proxy_call.py deleted file mode 100644 index 0908036..0000000 --- a/xthulu/ssh/terminal/proxy_call.py +++ /dev/null @@ -1,28 +0,0 @@ -"""Terminal proxy call wrapper""" - -# type checking -from typing import Callable - -# stdlib -from multiprocessing.connection import Connection - -# 3rd party -from wrapt import ObjectProxy - - -class TerminalProxyCall(ObjectProxy): - - """Wrapped terminal call to be proxied""" - - def __init__(self, wrapped: Callable, attr: str, pipe_master: Connection): - super().__init__(wrapped) - self.pipe_master = pipe_master - self.attr = attr - - def __call__(self, *args, **kwargs): - self.pipe_master.send((f"!CALL{self.attr}", args, kwargs)) - - return self.pipe_master.recv() - - def __str__(self): - return self.__call__() diff --git a/xthulu/ssh/terminal/proxy_terminal.py b/xthulu/ssh/terminal/proxy_terminal.py deleted file mode 100644 index dd19a5e..0000000 --- a/xthulu/ssh/terminal/proxy_terminal.py +++ /dev/null @@ -1,265 +0,0 @@ -"""Proxied blessed.Terminal""" - -# type checking -from typing import Any - -# stdlib -from asyncio import IncompleteReadError, TimeoutError, Queue, wait_for -import contextlib -from multiprocessing.connection import Connection - -# 3rd party -from blessed import Terminal -from blessed.keyboard import Keystroke -from blessed.formatters import FormattingOtherString, ParameterizingString - -# local -from ...configuration import get_config -from ...logger import log -from ..exceptions import ProcessClosing -from .proxy_call import TerminalProxyCall - -debug_term = bool(get_config("debug.term", False)) -"""Whether terminal debugging is enabled""" - - -class ProxyTerminal: - - """ - Terminal implementation which proxies calls to a `blessed.Terminal` instance - running in a subprocess - """ - - _kbdbuf = [] - - # context manager attribs - _ctxattrs = ( - "cbreak", - "fullscreen", - "hidden_cursor", - "keypad", - "location", - "raw", - ) - - # their type hints - cbreak: contextlib._GeneratorContextManager[Any] - fullscreen: contextlib._GeneratorContextManager[Any] - hidden_cursor: contextlib._GeneratorContextManager[Any] - keypad: contextlib._GeneratorContextManager[Any] - location: contextlib._GeneratorContextManager[Any] - raw: contextlib._GeneratorContextManager[Any] - - # other type hints for non-callable proxies - normal: str - - def __init__( - self, - stdin: Queue[bytes], - stdout: Any, - encoding: str, - pipe_master: Connection, - width: int = 0, - height: int = 0, - pixel_width: int = 0, - pixel_height: int = 0, - ): - self.stdin, self.stdout = stdin, stdout - self.encoding = encoding - self.pipe_master = pipe_master - self._width = width - self._height = height - self._pixel_width = pixel_width - self._pixel_height = pixel_height - - def __getattr__(self, attr: str) -> TerminalProxyCall: - @contextlib.contextmanager - def proxy_contextmanager(*args, **kwargs): - # we send special '!CTX' header, which means we - # expect two replies, the __enter__ and __exit__. because - # context managers can be wrapped, and entry/exit can happen - # in like entry/entry/entry/exit/exit/exit order, we *prefetch* - # any exit value and return code -- woah! not a problem because - # the things we wrap are pretty basic - self.pipe_master.send((f"!CTX{attr}", args, kwargs)) - - # one of two items, the '__enter__' context, - enter_side_effect, enter_value = self.pipe_master.recv() - exit_side_effect = self.pipe_master.recv() - - if debug_term: - log.debug( - f"wrap_ctx_manager({attr}, *{args}, **{kwargs}) " - f"=> entry: {enter_side_effect}, {enter_value})" - ) - log.debug( - f"wrap_ctx_manager({attr}, *{args}, **{kwargs}) " - f"=> exit: {exit_side_effect}" - ) - - if enter_side_effect: - self.stdout.write(enter_side_effect) - - yield enter_value - - if exit_side_effect: - self.stdout.write(exit_side_effect) - - if attr in self._ctxattrs: - return proxy_contextmanager # type: ignore - - blessed_attr = getattr(Terminal, attr, None) - - if callable(blessed_attr): - if debug_term: - log.debug(f"{attr} callable") - - resolved_value = TerminalProxyCall( - blessed_attr, attr, self.pipe_master - ) - - if debug_term: - log.debug(f"value: {resolved_value!r}") - else: - if debug_term: - log.debug(f"{attr} not callable") - - self.pipe_master.send((attr, (), {})) - resolved_value = self.pipe_master.recv() - - if debug_term: - log.debug(f"value: {resolved_value!r}") - - if isinstance( - resolved_value, - ( - ParameterizingString, - FormattingOtherString, - ), - ): - resolved_value = TerminalProxyCall( - resolved_value, attr, self.pipe_master - ) - if debug_term: - log.debug(repr(resolved_value)) - - if debug_term: - log.debug(f"setattr {attr}") - - setattr(self, attr, resolved_value) - - return resolved_value - - def does_styling(self): - return True - - does_styling.__doc__ = Terminal.does_styling.__doc__ - - @property - def pixel_width(self): - return self._pixel_width - - pixel_width.__doc__ = Terminal.pixel_width.__doc__ - - @property - def pixel_height(self): - return self._pixel_height - - pixel_height.__doc__ = Terminal.pixel_height.__doc__ - - @property - def height(self): - return self._height - - height.__doc__ = Terminal.height.__doc__ - - @property - def width(self): - return self._width - - width.__doc__ = Terminal.width.__doc__ - - async def inkey( - self, timeout: float | None = None, esc_delay: float = 0.35 - ): - ucs = "" - - # get anything currently in kbd buffer - for c in self._kbdbuf: - ucs += c - - self._kbdbuf.clear() - ks: Keystroke = ( - self.resolve(text=ucs) if len(ucs) else Keystroke() - ) # type: ignore - - # either buffer was empty or we don't have enough for a keystroke; - # wait for input from kbd - if not ks: - while True: - try: - inp: bytes - - if timeout is None: - # don't actually wait indefinitely; wait in 0.1 second - # increments so that the coroutine can be aborted if - # the connection is dropped - inp = await wait_for(self.stdin.get(), 0.1) - else: - inp = await wait_for(self.stdin.get(), timeout) - - try: - ucs += inp.decode(self.encoding) - break - - except UnicodeDecodeError: - # possible multibyte unicode symbol - bytebuf = [inp] - - while True: - try: - inp = await wait_for( - self.stdin.get(), esc_delay - ) - bytebuf.append(inp) - except TimeoutError: - break - - ucs += b"".join(bytebuf).decode(self.encoding) - break - - except IncompleteReadError: - raise ProcessClosing() - - except TimeoutError: - if timeout is not None: - break - - ks = ( - self.resolve(text=ucs) if len(ucs) else Keystroke() - ) # type: ignore - - if ks.code == self.KEY_ESCAPE: - # esc was received; let's see if we're getting a key sequence - while ucs in self._keymap_prefixes: # type: ignore - try: - inp = await wait_for(self.stdin.get(), esc_delay) - ucs += inp.decode(self.encoding) - - except IncompleteReadError: - raise ProcessClosing() - - except TimeoutError: - break - - ks = ( - self.resolve(text=ucs) if len(ucs) else Keystroke() - ) # type: ignore - - # append any remaining input back into the kbd buffer - for c in ucs[len(ks) :]: - self._kbdbuf.append(c) - - return ks - - inkey.__doc__ = Terminal.inkey.__doc__ diff --git a/xthulu/ssh/terminal/subprocess.py b/xthulu/ssh/terminal/subprocess.py deleted file mode 100644 index 5e85876..0000000 --- a/xthulu/ssh/terminal/subprocess.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Subprocess terminal""" - -# stdlib -import contextlib -from functools import partial -from io import StringIO - -# 3rd party -from blessed.keyboard import resolve_sequence -from blessed.terminal import Terminal - -# local -from ...logger import log - - -class SubprocessTerminal(Terminal): - - """`blessed.Terminal` instance which runs in a subprocess""" - - def __init__( - self, - kind: str, - height: int = 0, - width: int = 0, - pixel_height: int = 0, - pixel_width: int = 0, - ): - stream = StringIO() - super().__init__(kind, stream, force_styling=True) - log.debug(f"Terminal.errors: {self.errors}") - self._keyboard_fd = "defunc" - self._height = height - self._width = width - self.resolve = partial( - resolve_sequence, - mapper=self._keymap, # type: ignore - codes=self._keycodes, # type: ignore - ) - - @contextlib.contextmanager - def raw(self): - yield - - raw.__doc__ = Terminal.raw.__doc__ - - @contextlib.contextmanager - def cbreak(self): - yield - - cbreak.__doc__ = Terminal.cbreak.__doc__ - - @property - def is_a_tty(self): - return True - - is_a_tty.__doc__ = Terminal.is_a_tty.__doc__ diff --git a/xthulu/ssh/ui/__init__.py b/xthulu/ssh/ui/__init__.py deleted file mode 100644 index 17f1304..0000000 --- a/xthulu/ssh/ui/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -"""User interface module""" - -from .art import show_art -from .editors import BlockEditor, LineEditor - -__all__ = ( - "BlockEditor", - "LineEditor", - "show_art", -) diff --git a/xthulu/ssh/ui/art.py b/xthulu/ssh/ui/art.py deleted file mode 100644 index acb1517..0000000 --- a/xthulu/ssh/ui/art.py +++ /dev/null @@ -1,130 +0,0 @@ -"""art module""" - -# stdlib -from asyncio import sleep -from os.path import exists, isfile -import re - -# 3rd party -from aiofiles import open -from blessed.keyboard import Keystroke -from blessed.sequences import SequenceTextWrapper - -# local -from ..context import SSHContext - - -async def show_art( - cx: SSHContext, - filename: str, - delay=0.125, - dismissable=True, - preload=0, - maxwidth: int | None = None, - center=False, - encoding="cp437", -) -> Keystroke | None: - """ - Display artwork from given filename with given delay between rows of - output. - - Args: - cx: The current context. - filename: The filename to load. - delay: Delay (in seconds) between rows of output. - dismissable: If the display can be prematurely ended by keypress. - preload: Number of rows to show immediately without delay - (0 for term height - 1, None for no preload). - maxwidth: The maximum number of columns to display. - center: True to center output. - encoding: The encoding to use for output. - - Returns: - The keypress and prematurely ended the display, if any. - """ - - def newline(): - cx.echo(f"{cx.term.normal}\r\n") - - if maxwidth is None or maxwidth > cx.term.width - 1: - maxwidth = cx.term.width - 1 - - assert maxwidth is not None - - if not (exists(filename) and isfile(filename)): - raise FileNotFoundError(f"Could not find {filename}") - - if preload is not None and preload <= 0: - preload = cx.term.height - 1 - - file_lines: list[str] - - async with open(filename, "r", encoding=encoding) as f: - file_lines = await f.readlines() - - lines: list[str] = [] - wrapper = SequenceTextWrapper(maxwidth, cx.term) # type: ignore - longest_line = 0 - - # \x1a is the EOF character, used to delimit SAUCE from the artwork - for line in [re.sub(r"\r|\n|\x1a.*", "", l) for l in file_lines]: - # replace CUF sequences with spaces to avoid loss of background - # color when a CUF sequence would wrap - normalized = re.sub( - r"\x1b\[(\d+)C", lambda x: " " * int(x.group(1)), line - ) - # wrap to maximum width and discard excess - wrapped = wrapper.wrap(normalized) - did_wrap = len(wrapped) > 1 - first = wrapped[0] if did_wrap else normalized - lines.append(first) - - # calculate longest line length for centering - if center and longest_line < maxwidth: - if did_wrap: - # if this line wrapped, we know it's at least maxwidth - longest_line = maxwidth - - else: - # strip sequences to get true length - stripped = re.sub(r"\x1b\[[;0-9]+m", "", first) - strlen = len(stripped) - - if strlen > longest_line: - longest_line = strlen - - center_pos = ( - 0 if not center else max(0, (cx.term.width // 2) - (longest_line // 2)) - ) - row = 0 - - for line in lines: - if center_pos > 0: - cx.echo(cx.term.move_right(center_pos)) - - cx.echo(line) - row += 1 - - if preload is not None and row < preload: - newline() - - continue - - if delay is not None and delay > 0: - if cx.events.get("resize"): - newline() - - return - - if dismissable: - ks: Keystroke = await cx.term.inkey(timeout=delay) - - if ks.code is not None: - newline() - - return ks - - else: - await sleep(delay) - - newline() diff --git a/xthulu/ssh/ui/editors/__init__.py b/xthulu/ssh/ui/editors/__init__.py deleted file mode 100644 index 2571223..0000000 --- a/xthulu/ssh/ui/editors/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Editors module""" - -from .block import BlockEditor -from .line import LineEditor - -__all__ = ( - "BlockEditor", - "LineEditor", -) diff --git a/xthulu/ssh/ui/editors/block.py b/xthulu/ssh/ui/editors/block.py deleted file mode 100644 index 5b3ddd6..0000000 --- a/xthulu/ssh/ui/editors/block.py +++ /dev/null @@ -1,579 +0,0 @@ -# stdlib -from typing import Callable - -# 3rd party -from blessed.keyboard import Keystroke -from wcwidth import wcswidth - -# local -from ....logger import log -from ...terminal.proxy_terminal import ProxyTerminal - - -class BlockEditor: - - """Block editor (multiple lines)""" - - value = [] - """Editor text""" - - limit = [0, 0] - """Text length limit (width, height)""" - - corner = [None, None] - """Top-left corner of editor on screen (x, y)""" - - cursor = [0, 0] - """Cursor offset from top-left corner on screen (x, y)""" - - pos = [0, 0] - """Position within corpus of corner of editor (x, y)""" - - # internals - _color_str = "bright_white_on_blue" - _color = None - - def __init__(self, term: ProxyTerminal, rows: int, columns: int, **kwargs): - self.term = term - """Terminal to use for sequences""" - - self.rows = rows - """Height in rows""" - - self.columns = columns - """Width in columns""" - - if "value" not in kwargs: - self.value = [""] * rows - - if "color" in kwargs: - self._color_str = kwargs["color"] - del kwargs["color"] - - for k in kwargs: - setattr(self, k, kwargs[k]) - - self._color = getattr(term, self._color_str) - strlen = 0 - - if "pos" not in kwargs: - for i in range(max(0, len(self.value) - self.rows)): - if len(self.value[i]) > 0: - self.pos[1] = i - - strlen = wcswidth(self.value[self.pos[1]]) - self.pos[0] = max(0, strlen - self.columns) - - if "cursor" not in kwargs: - if strlen == 0: - strlen = wcswidth(self.value[self.pos[1]]) - - self.cursor[0] = min(self.columns - 1, max(0, strlen)) - self.cursor[1] = min(self.rows - 1, max(0, len(self.value) - 1)) - - log.debug(f"corner: {self.corner}") - log.debug(f"pos: {self.pos}") - log.debug(f"cursor: {self.cursor}") - - @property - def color(self) -> Callable: - """ - Color property; setting it also sets the internal Terminal callable. - """ - - return getattr(self.term, self._color_str) - - @color.setter - def color(self, val): - self._color_str = val - self._color = getattr(self.term, val) - - @property - def at_end(self) -> bool: - """Whether the cursor is at the end of the editor.""" - - if self.limit[0] == 0: - return False - - return ( - self.cursor[0] >= self.columns - 1 - and wcswidth(self.value[self.pos[1]]) >= self.limit[0] - ) - - @property - def edge_diff(self) -> int: - """How far the cursor is from the edge of the current row.""" - - strlen = wcswidth(self.value[self.cursor[1] + self.pos[1]]) - - return self.pos[0] + self.cursor[0] - strlen - - def redraw(self, cursor=True, anchor=True) -> str: - """ - Output sequence to redraw editor. - - Args: - cursor: Redraw cursor position as well. - anchor: Reset anchor position as well. - - Returns: - The output string to redraw the editor/cursor. - """ - - out = "" - left = self.pos[0] - top = self.pos[1] - travel = 0 - - if anchor: - out += self.reset_anchor() - - for i in range(self.rows): - if i > 0: - out += "\r\n" - travel += 1 - - if self.corner[0] is not None: - out += self.term.move_x(self.corner[0]) - - text = self.value[top + i][left : left + self.columns] - out += self.color(text) - out += self.color(" " * (self.columns - wcswidth(text))) - out += self.term.move_left(self.columns) - - log.debug("redrawing editor") - - if cursor: - if travel > 0: - out += self.term.move_up(travel) - - out += self.redraw_cursor() - - return out - - def redraw_cursor(self) -> str: - """ - Output sequence to restore cursor position; assumes cursor is already - located at top-left of editor if self.corner is unset. - - Returns: - The output sequence to redraw the cursor. - """ - - out = "" - - # move cursor back to top left before adjusting to self.cursor offset - if self.corner[0] is not None: - out += self.term.move_x(self.corner[0]) - - if self.corner[1] is not None: - out += self.term.move_y(self.corner[1]) - - # adjust offset - if self.cursor[0] > 0: - out += self.term.move_right(self.cursor[0]) - - if self.cursor[1] > 0: - out += self.term.move_down(self.cursor[1]) - - log.debug("redrawing cursor") - - return out - - def reset_anchor(self) -> str: - """ - Assuming the cursor has not moved since the editor was responsible for - it, this will reset the on-screen cursor to the top-left corner of the - editor. - - Returns: - The output sequence to reset the anchor. - """ - - out = "" - - if self.cursor[0] > 0: - out += self.term.move_left(self.cursor[0]) - - if self.cursor[1] > 0: - out += self.term.move_up(self.cursor[1]) - - log.debug("resetting anchor") - - return out - - def reset(self): - """Reset the editor's value, cursor, and offset position.""" - - self.cursor = [0, 0] - self.pos = [0, 0] - self.value = [""] * self.rows - - @property - def _row_vars(self): - row = self.value[self.pos[1] + self.cursor[1]] - before = row[: self.pos[0] + self.cursor[0]] - after = row[self.pos[0] + self.cursor[0] :] - - return row, before, after - - def _kp_backspace(self) -> str: - _, before, after = self._row_vars - - if self.pos[0] <= 0 and self.cursor[0] == 0: - log.debug("at start of line, nothing to backspace") - return "" - - shift = False - dist = wcswidth(before[-1]) - self.value[self.pos[1] + self.cursor[1]] = before[:-1] + after - out = "" - - after = after[: min(wcswidth(after), self.columns - self.cursor[0])] - - if self.cursor[0] < dist: - self.pos[0] = max(0, self.pos[0] - dist) - self.cursor[0] = 0 - shift = True - else: - after += " " * dist - - if dist > 0: - out += self.term.move_left(dist) - - self.cursor[0] -= dist - - if shift: - out = self.redraw() - else: - out += self.color(after) - - if dist > 0: - out += self.term.move_left(dist) - - log.debug( - f"backspace {self.pos} {self.cursor} " - f"{self.value[self.pos[1] + self.cursor[1]]!r}" - ) - - return out - - def _kp_delete(self) -> str: - _, before, after = self._row_vars - - if self.pos[0] >= wcswidth(self.value[self.pos[1]]): - log.debug("at end of line, nothing to delete") - return "" - - after = after[1:] - dist = wcswidth(after) - self.value[self.pos[1] + self.cursor[1]] = before + after - after = after[: self.columns - self.cursor[0]] - out = "" - - if self.cursor[0] + dist <= self.columns - 1: - after += " " * (dist + 1) - out += self.term.move_left(dist + 1) - - log.debug(f'delete "{self.value[self.pos[1] + self.cursor[1]]}"') - - return self.color( - after + (self.term.move_left(dist) if dist > 0 else "") + out - ) - - def _horizontal(self, distance: int) -> str: - """ - Horizontal movement helper method; handles shifting visible area. - - Args: - distance: The distance to travel. Negative = left, positive = right. - - Returns: - The output sequence for updating the screen. - """ - - if distance == 0: - return "" - - if distance < 0 and self.cursor[0] <= 0 and self.pos[0] <= 0: - log.debug("already at start of line") - - return "" - - if distance > 0 and self.edge_diff >= 0: - log.debug("already at end of line") - - return "" - - curline = self.value[self.pos[1] + self.cursor[1]] - strlen = wcswidth(curline) - shift = False - move: Callable[..., str] - - new_cursor = self.cursor[0] + distance - abs_distance = abs(distance) - - if distance < 0: - if new_cursor < 0: - log.debug("shifting visible area left") - self.pos[0] = max( - 0, self.pos[0] + self.cursor[0] - abs_distance - ) - new_cursor = 0 - shift = True - - move = self.term.move_left - - else: - if new_cursor > self.columns - 1: - log.debug("shifting visible area right") - self.pos[0] = min( - strlen - self.columns + 1, self.pos[0] + abs_distance - ) - new_cursor = self.columns - 1 - shift = True - - move = self.term.move_right - - out: list[str] = [move(abs(self.cursor[0] - new_cursor))] - self.cursor[0] = new_cursor - - if shift: - out.append(self.redraw()) - - log.debug( - f"{'left' if distance < 0 else 'right'} {self.pos} {self.cursor}" - ) - - return "".join(out) - - def _vertical(self, distance: int) -> str: - """ - Vertical movement helper method; handles shifting visible area and - snapping the cursor to the edge of the destination row's text. - - Args: - distance: The distance to travel. Negative = up, positive = down. - - Returns: - The output sequence for updating the screen. - """ - - if distance == 0: - return "" - - if distance < 0 and self.cursor[1] <= 0 and self.pos[1] <= 0: - log.debug("already at start of editor") - - return "" - - if distance > 0 and self.pos[1] + self.cursor[1] >= len(self.value) - 1: - log.debug("already at end of editor") - - return "" - - snap_to_edge = self.edge_diff >= 0 - shift = False - vallen = len(self.value) - clamp_low = -(self.pos[1] + self.cursor[1]) - clamp_high = vallen - self.pos[1] - self.cursor[1] - log.debug(f"distance clamp: {clamp_low} - {clamp_high}") - distance = min(clamp_high, max(clamp_low, distance)) - new_cursor = self.cursor[1] + distance - log.debug(f"new cursor: {new_cursor}") - out = [] - - if distance < 0: - if new_cursor < 0: - log.debug("shifting visible area up") - new_cursor = 0 - shift = True - - move = self.term.move_up - - else: - if new_cursor >= self.rows: - log.debug("shifting visible area down") - new_cursor = self.rows - 1 - shift = True - - move = self.term.move_down - - if shift: - self.pos[1] = max( - 0, - min(self.pos[1] + distance, vallen - self.rows), - ) - - log.debug(f"clamped new cursor: {new_cursor}") - cursor_shift = abs(self.cursor[1] - new_cursor) - log.debug(f"cursor shift: {cursor_shift}") - - if shift and cursor_shift > 0: - out.append(move(cursor_shift)) - - self.cursor[1] = new_cursor - diff = self.edge_diff - - if not snap_to_edge and diff > 0: - log.debug("past end of line") - snap_to_edge = True - - if snap_to_edge: - log.debug("snapping to edge") - self.cursor[0] -= diff - - if self.cursor[0] < 0 or self.cursor[0] >= self.columns: - log.debug("edge is out of view") - strlen = wcswidth(self.value[self.pos[1] + self.cursor[1]]) - self.pos[0] = max(0, strlen - self.columns + 1) - self.cursor[0] = self.columns - 1 - - if not shift: - out.append(move(cursor_shift)) - - shift = True - - elif diff > 0: - out.append(self.term.move_left(diff)) - - elif diff < 0: - out.append(self.term.move_right(-diff)) - - out.append(self.redraw() if shift else move(cursor_shift)) - log.debug( - f"{'up' if distance < 0 else 'down'} {self.pos} {self.cursor}" - ) - - return "".join(out) - - def _kp_left(self) -> str: - char_behind: str - - try: - char_behind = self.value[self.pos[1] + self.cursor[1]][ - self.pos[0] + self.cursor[0] - 1 - ] - except IndexError: - char_behind = "" - - return self._horizontal(min(-1, -wcswidth(char_behind))) - - def _kp_right(self) -> str: - char_under: str - - try: - char_under = self.value[self.pos[1] + self.cursor[1]][ - self.pos[0] + self.cursor[0] - ] - except IndexError: - char_under = "" - - return self._horizontal(max(1, wcswidth(char_under))) - - def _kp_home(self) -> str: - return self._horizontal(-(self.pos[0] + self.cursor[0])) - - def _kp_end(self) -> str: - return self._horizontal(-self.edge_diff) - - def _kp_up(self) -> str: - return self._vertical(-1) - - def _kp_down(self) -> str: - return self._vertical(1) - - def _kp_pgup(self) -> str: - return self._vertical(-(self.rows - 1)) - - def _kp_pgdown(self) -> str: - return self._vertical(self.rows - 1) - - def process_keystroke(self, ks: Keystroke) -> str: - """ - Process keystroke and produce output (if any). - - Args: - ks: The keystroke to process. - - Returns: - The output sequence for screen updates. - """ - - handlers = { - self.term.KEY_BACKSPACE: self._kp_backspace, - self.term.KEY_DELETE: self._kp_delete, - self.term.KEY_LEFT: self._kp_left, - self.term.KEY_RIGHT: self._kp_right, - self.term.KEY_HOME: self._kp_home, - self.term.KEY_END: self._kp_end, - } - - if self.rows > 1: - handlers = { - **handlers, - **{ - self.term.KEY_UP: self._kp_up, - self.term.KEY_DOWN: self._kp_down, - self.term.KEY_PGUP: self._kp_pgup, - self.term.KEY_PGDOWN: self._kp_pgdown, - }, - } - - # TODO general: tab - # TODO multiline: under/overflow, enter - - if ks.code in handlers: - return handlers[ks.code]() - - if ks.is_sequence: - log.debug(f"swallowing sequence {ks!r}") - return "" - - if self.limit[0] > 0 and self.pos[0] + self.cursor[0] >= self.limit[0]: - log.debug(f"reached text limit, discarding {ks!r}") - return "" - - ucs = str(ks) - - if not self.term.length(ucs): - ordinal = ord(ucs) - log.debug(f"zero length ucs: {hex(ordinal)}") - - # control characters; do not add to editor value - if 0 <= ordinal <= 31: - log.debug("discarding") - return "" - - # handle typed character - - _, before, after = self._row_vars - dist = wcswidth(ucs) - self.value[self.pos[1] + self.cursor[1]] = before + ucs + after - self.cursor[0] += dist - log.debug(f"{self.pos} {self.cursor} {self.value}") - - if not self.at_end and self.cursor[0] >= self.columns: - self.cursor[0] -= dist - self.pos[0] += dist - log.debug("shifting visible area to right") - - return self.redraw() - - after = after[: min(wcswidth(after), self.columns - self.cursor[0])] - afterlen = wcswidth(after) - move_left = ( - afterlen - if ( - self.pos[0] + self.cursor[0] - < wcswidth(self.value[self.pos[1] + self.cursor[1]]) - ) - else afterlen - 1 - ) - - out = [ucs, after] - - if move_left > 0: - out.append(self.term.move_left(move_left)) - - return self.color("".join(out)) diff --git a/xthulu/ssh/ui/editors/line.py b/xthulu/ssh/ui/editors/line.py deleted file mode 100644 index c46410d..0000000 --- a/xthulu/ssh/ui/editors/line.py +++ /dev/null @@ -1,39 +0,0 @@ -# stdlib -from typing import Literal - -# local -from ...terminal.proxy_terminal import ProxyTerminal -from .block import BlockEditor - - -class LineEditor(BlockEditor): - - """Line editor (single line)""" - - def __init__( - self, term: ProxyTerminal, columns: int, limit=0, *args, **kwargs - ): - """ - Line editor (single line) - - Args: - term: The terminal to use for generating output sequences. - columns: The width of the editor in columns. - limit: The maximum number of characters allowed in the editor. - args: Arguments passed to the underlying `BlockEditor` constructor. - kwargs: Arguments passed to the underlying `BlockEditor` - constructor. - """ - - super().__init__(term, 1, columns, limit=(limit, 1), *args, **kwargs) - - @property - def rows(self) -> Literal[1]: - """A line editor only has a single row.""" - - return 1 - - @rows.setter - def rows(self, val: int): - if val != 1: - raise ValueError("LineEditor must have exactly 1 row")