-
-
Notifications
You must be signed in to change notification settings - Fork 30.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
New tty/pty modules by Steen; new urlparser.
- Loading branch information
1 parent
c0af2aa
commit 23cb2a8
Showing
3 changed files
with
336 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
# pty.py -- Pseudo terminal utilities. | ||
|
||
# Bugs: No signal handling. Doesn't set slave termios and window size. | ||
# Only tested on Linux. | ||
# See: W. Richard Stevens. 1992. Advanced Programming in the | ||
# UNIX Environment. Chapter 19. | ||
# Author: Steen Lumholt -- with additions by Guido. | ||
|
||
from select import select | ||
import os, sys, FCNTL | ||
import tty | ||
|
||
STDIN_FILENO = 0 | ||
STDOUT_FILENO = 1 | ||
STDERR_FILENO = 2 | ||
|
||
CHILD = 0 | ||
|
||
# Open pty master. Returns (master_fd, tty_name). SGI and Linux/BSD version. | ||
def master_open(): | ||
try: | ||
import sgi | ||
except ImportError: | ||
pass | ||
else: | ||
try: | ||
tty_name, master_fd = sgi._getpty(FCNTL.O_RDWR, 0666, 0) | ||
except IOError, msg: | ||
raise os.error, msg | ||
return master_fd, tty_name | ||
for x in 'pqrstuvwxyzPQRST': | ||
for y in '0123456789abcdef': | ||
pty_name = '/dev/pty' + x + y | ||
try: | ||
fd = os.open(pty_name, FCNTL.O_RDWR) | ||
except os.error: | ||
continue | ||
return (fd, '/dev/tty' + x + y) | ||
raise os.error, 'out of pty devices' | ||
|
||
# Open the pty slave. Acquire the controlling terminal. | ||
# Returns file descriptor. Linux version. (Should be universal? --Guido) | ||
def slave_open(tty_name): | ||
return os.open(tty_name, FCNTL.O_RDWR) | ||
|
||
# Fork and make the child a session leader with a controlling terminal. | ||
# Returns (pid, master_fd) | ||
def fork(): | ||
master_fd, tty_name = master_open() | ||
pid = os.fork() | ||
if pid == CHILD: | ||
# Establish a new session. | ||
os.setsid() | ||
|
||
# Acquire controlling terminal. | ||
slave_fd = slave_open(tty_name) | ||
os.close(master_fd) | ||
|
||
# Slave becomes stdin/stdout/stderr of child. | ||
os.dup2(slave_fd, STDIN_FILENO) | ||
os.dup2(slave_fd, STDOUT_FILENO) | ||
os.dup2(slave_fd, STDERR_FILENO) | ||
if (slave_fd > STDERR_FILENO): | ||
os.close (slave_fd) | ||
|
||
# Parent and child process. | ||
return pid, master_fd | ||
|
||
# Write all the data to a descriptor. | ||
def writen(fd, data): | ||
while data != '': | ||
n = os.write(fd, data) | ||
data = data[n:] | ||
|
||
# Default read function. | ||
def read(fd): | ||
return os.read(fd, 1024) | ||
|
||
# Parent copy loop. | ||
# Copies | ||
# pty master -> standard output (master_read) | ||
# standard input -> pty master (stdin_read) | ||
def copy(master_fd, master_read=read, stdin_read=read): | ||
while 1: | ||
rfds, wfds, xfds = select( | ||
[master_fd, STDIN_FILENO], [], []) | ||
if master_fd in rfds: | ||
data = master_read(master_fd) | ||
os.write(STDOUT_FILENO, data) | ||
if STDIN_FILENO in rfds: | ||
data = stdin_read(STDIN_FILENO) | ||
writen(master_fd, data) | ||
|
||
# Create a spawned process. | ||
def spawn(argv, master_read=read, stdin_read=read): | ||
if type(argv) == type(''): | ||
argv = (argv,) | ||
pid, master_fd = fork() | ||
if pid == CHILD: | ||
apply(os.execlp, (argv[0],) + argv) | ||
mode = tty.tcgetattr(STDIN_FILENO) | ||
tty.setraw(STDIN_FILENO) | ||
try: | ||
copy(master_fd, master_read, stdin_read) | ||
except: | ||
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# tty.py -- Terminal utilities. | ||
# Author: Steen Lumholt. | ||
|
||
from TERMIOS import * | ||
from termios import * | ||
|
||
# Indexes for termios list. | ||
IFLAG = 0 | ||
OFLAG = 1 | ||
CFLAG = 2 | ||
LFLAG = 3 | ||
ISPEED = 4 | ||
OSPEED = 5 | ||
CC = 6 | ||
|
||
# Put terminal into a raw mode. | ||
def setraw(fd, when=TCSAFLUSH): | ||
mode = tcgetattr(fd) | ||
mode[IFLAG] = mode[IFLAG] & ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON) | ||
mode[OFLAG] = mode[OFLAG] & ~(OPOST) | ||
mode[CFLAG] = mode[CFLAG] & ~(CSIZE | PARENB) | ||
mode[CFLAG] = mode[CFLAG] | CS8 | ||
mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON | IEXTEN | ISIG) | ||
mode[CC][VMIN] = 1 | ||
mode[CC][VTIME] = 0 | ||
tcsetattr(fd, when, mode) | ||
|
||
# Put terminal into a cbreak mode. | ||
def setcbreak(fd, when=TCSAFLUSH): | ||
mode = tcgetattr(fd) | ||
mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON) | ||
mode[CC][VMIN] = 1 | ||
mode[CC][VTIME] = 0 | ||
tcsetattr(fd, when, mode) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
# Parse (absolute and relative) URLs according to latest internet draft: | ||
|
||
# Uniform Resource Identifiers Working Group R. Fielding | ||
# INTERNET-DRAFT UC Irvine | ||
# Expires February 24, 1995 August 24, 1994 | ||
# | ||
# Relative Uniform Resource Locators | ||
# <draft-ietf-uri-relative-url-00.txt> | ||
|
||
# Standard/builtin Python modules | ||
import string | ||
|
||
# A classification of schemes ('' means apply by default) | ||
uses_relative = ['ftp', 'http', 'gopher', 'nntp', 'wais', 'file', | ||
'prospero', ''] | ||
uses_netloc = ['ftp', 'http', 'gopher', 'nntp', 'telnet', 'wais', | ||
'file', 'prospero', ''] | ||
non_hierarchical = ['gopher', 'mailto', 'news', 'telnet', 'wais'] | ||
uses_params = ['ftp', 'prospero', ''] | ||
uses_query = ['http', 'wais', ''] | ||
uses_fragment = ['ftp', 'http', 'gopher', 'news', 'nntp', 'wais', | ||
'file', 'prospero', ''] | ||
|
||
# Characters valid in scheme names | ||
scheme_chars = string.letters + string.digits + '+-.' | ||
|
||
# Parse a URL into 6 components: | ||
# <scheme>://<netloc>/<path>;<params>?<query>#<fragment> | ||
# Return a 6-tuple: (scheme, netloc, path, params, query, fragment). | ||
# Note that we don't break the components up in smaller bits | ||
# (e.g. netloc is a single string) and we don't expand % escapes. | ||
def urlparse(url, scheme = '', allow_framents = 1): | ||
netloc = '' | ||
path = '' | ||
params = '' | ||
query = '' | ||
fragment = '' | ||
i = string.find(url, ':') | ||
if i > 0: | ||
for c in url[:i]: | ||
if c not in scheme_chars: | ||
break | ||
else: | ||
scheme, url = string.lower(url[:i]), url[i+1:] | ||
if scheme in uses_netloc: | ||
if url[:2] == '//': | ||
i = string.find(url, '/', 2) | ||
if i < 0: | ||
i = len(url) | ||
netloc, url = url[2:i], url[i:] | ||
if allow_framents and scheme in uses_fragment: | ||
i = string.rfind(url, '#') | ||
if i >= 0: | ||
url, fragment = url[:i], url[i+1:] | ||
if scheme in uses_query: | ||
i = string.find(url, '?') | ||
if i >= 0: | ||
url, query = url[:i], url[i+1:] | ||
if scheme in uses_params: | ||
i = string.find(url, ';') | ||
if i >= 0: | ||
url, params = url[:i], url[i+1:] | ||
return scheme, netloc, url, params, query, fragment | ||
|
||
# Put a parsed URL back together again. This may result in a slightly | ||
# different, but equivalent URL, if the URL that was parsed originally | ||
# had redundant delimiters, e.g. a ? with an empty query (the draft | ||
# states that these are equivalent). | ||
def urlunparse((scheme, netloc, url, params, query, fragment)): | ||
if netloc: | ||
url = '//' + netloc + url | ||
if scheme: | ||
url = scheme + ':' + url | ||
if params: | ||
url = url + ';' + params | ||
if query: | ||
url = url + '?' + query | ||
if fragment: | ||
url = url + '#' + fragment | ||
return url | ||
|
||
# Join a base URL and a possibly relative URL to form an absolute | ||
# interpretation of the latter. | ||
def urljoin(base, url, allow_framents = 1): | ||
if not base: | ||
return url | ||
bscheme, bnetloc, bpath, bparams, bquery, bfragment = \ | ||
urlparse(base, '', allow_framents) | ||
scheme, netloc, path, params, query, fragment = \ | ||
urlparse(url, bscheme, allow_framents) | ||
if scheme != bscheme or scheme not in uses_relative: | ||
return urlunparse((scheme, netloc, path, | ||
params, query, fragment)) | ||
if scheme in uses_netloc: | ||
if netloc: | ||
return urlunparse((scheme, netloc, path, | ||
params, query, fragment)) | ||
netloc = bnetloc | ||
if path[:1] == '/': | ||
return urlunparse((scheme, netloc, path, | ||
params, query, fragment)) | ||
if not path: | ||
path = bpath | ||
if not query: | ||
query = bquery | ||
return urlunparse((scheme, netloc, path, | ||
params, query, fragment)) | ||
i = string.rfind(bpath, '/') | ||
if i < 0: | ||
i = len(bpath) | ||
path = bpath[:i] + '/' + path | ||
segments = string.splitfields(path, '/') | ||
if segments[-1] == '.': | ||
segments[-1] = '' | ||
while '.' in segments: | ||
segments.remove('.') | ||
while 1: | ||
i = 1 | ||
n = len(segments) - 1 | ||
while i < n: | ||
if segments[i] == '..' and segments[i-1]: | ||
del segments[i-1:i+1] | ||
break | ||
i = i+1 | ||
else: | ||
break | ||
if len(segments) >= 2 and segments[-1] == '..': | ||
segments[-2:] = [''] | ||
path = string.joinfields(segments, '/') | ||
return urlunparse((scheme, netloc, path, | ||
params, query, fragment)) | ||
|
||
test_input = """ | ||
http://a/b/c/d | ||
g:h = <URL:g:h> | ||
http:g = <URL:http://a/b/c/g> | ||
http: = <URL:http://a/b/c/d> | ||
g = <URL:http://a/b/c/g> | ||
./g = <URL:http://a/b/c/g> | ||
g/ = <URL:http://a/b/c/g/> | ||
/g = <URL:http://a/g> | ||
//g = <URL:http://g> | ||
?y = <URL:http://a/b/c/d?y> | ||
g?y = <URL:http://a/b/c/g?y> | ||
g?y/./x = <URL:http://a/b/c/g?y/./x> | ||
. = <URL:http://a/b/c/> | ||
./ = <URL:http://a/b/c/> | ||
.. = <URL:http://a/b/> | ||
../ = <URL:http://a/b/> | ||
../g = <URL:http://a/b/g> | ||
../.. = <URL:http://a/> | ||
../../g = <URL:http://a/g> | ||
../../../g = <URL:http://a/../g> | ||
./../g = <URL:http://a/b/g> | ||
./g/. = <URL:http://a/b/c/g/> | ||
/./g = <URL:http://a/./g> | ||
g/./h = <URL:http://a/b/c/g/h> | ||
g/../h = <URL:http://a/b/c/h> | ||
http:g = <URL:http://a/b/c/g> | ||
http: = <URL:http://a/b/c/d> | ||
""" | ||
|
||
def test(): | ||
import sys | ||
base = '' | ||
if sys.argv[1:]: | ||
fn = sys.argv[1] | ||
if fn == '-': | ||
fp = sys.stdin | ||
else: | ||
fp = open(fn) | ||
else: | ||
import StringIO | ||
fp = StringIO.StringIO(test_input) | ||
while 1: | ||
line = fp.readline() | ||
if not line: break | ||
words = string.split(line) | ||
if not words: | ||
continue | ||
url = words[0] | ||
parts = urlparse(url) | ||
print '%-10s : %s' % (url, parts) | ||
abs = urljoin(base, url) | ||
if not base: | ||
base = abs | ||
wrapped = '<URL:%s>' % abs | ||
print '%-10s = %s' % (url, wrapped) | ||
if len(words) == 3 and words[1] == '=': | ||
if wrapped != words[2]: | ||
print 'EXPECTED', words[2], '!!!!!!!!!!' | ||
|
||
if __name__ == '__main__': | ||
test() |