forked from levioctl/textual-switcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
tabcontrol.py
142 lines (120 loc) · 5.6 KB
/
tabcontrol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import json
import struct
import os.path
import unicodedata
import expiringdict
import glib_wrappers
from gi.repository import GLib, Gio
from gi.repository.GdkPixbuf import Pixbuf
class ApiProxyNotReady(Exception): pass
class TabControl(object):
API_PROXY_NAMED_PIPES_DIR = os.path.join('/run', 'user', str(os.getuid()), "textual-switcher-proxy")
OUT_PIPE_FILENAME = os.path.join(API_PROXY_NAMED_PIPES_DIR, "textual_switcher_to_api_proxy_for_firefox_pid_%d")
IN_PIPE_FILENAME = os.path.join(API_PROXY_NAMED_PIPES_DIR, "api_proxy_to_textual_switcher_for_firefox_pid_%d")
ONE_MONTH_IN_SECONDS = 60 * 60 * 24 * 7 * 4
def __init__(self, update_tabs_callback, update_tab_icon_callback):
self._in_fds_by_browser_pid = dict()
self._out_fds_by_browser_pid = dict()
self._update_tabs_callback = update_tabs_callback
self._update_tab_icon_callback = update_tab_icon_callback
self._in_id = None
self._icon_cache = expiringdict.ExpiringDict(max_len=100, max_age_seconds=self.ONE_MONTH_IN_SECONDS)
def async_list_browser_tabs(self, active_browsers):
for browser in active_browsers:
is_connected = self._validate_connection_to_browser(browser.pid)
if not is_connected:
continue
self._activate_callback_for_one_message_from_api_proxy(browser.pid)
self.send_list_tabs_command(browser.pid)
self._clean_stale_descriptors(active_browsers)
def async_move_to_tab(self, tab_id, pid):
is_connected = self._validate_connection_to_browser(pid)
if is_connected:
command = 'move_to_tab:%d;' % (tab_id)
os.write(self._out_fds_by_browser_pid[pid], command)
def _validate_connection_to_browser(self, pid):
if not self._is_connected_to_api_proxy_for_browser_pid(pid):
try:
self._in_id = self._connect_to_api_for_browser_pid(pid)
except ApiProxyNotReady:
return False
return True
def _activate_callback_for_one_message_from_api_proxy(self, pid):
in_fd = self._in_fds_by_browser_pid[pid]
GLib.io_add_watch(in_fd, GLib.IO_IN, self._receive_message_from_api_proxy)
def _is_connected_to_api_proxy_for_browser_pid(self, pid):
return pid in self._in_fds_by_browser_pid and pid in self._out_fds_by_browser_pid
def _connect_to_api_for_browser_pid(self, pid):
in_pipe_filename = self.IN_PIPE_FILENAME % (pid,)
try:
self._in_fds_by_browser_pid[pid] = os.open(in_pipe_filename, os.O_RDONLY | os.O_NONBLOCK)
except:
raise ApiProxyNotReady(pid)
out_pipe_filename = self.OUT_PIPE_FILENAME % (pid,)
try:
self._out_fds_by_browser_pid[pid] = os.open(out_pipe_filename, os.O_WRONLY | os.O_NONBLOCK)
except:
raise ApiProxyNotReady(pid)
def _receive_message_from_api_proxy(self, fd, cond):
pid = [_pid for _pid, _in_fd in self._in_fds_by_browser_pid.iteritems() if fd == _in_fd]
if not pid or len(pid) > 1:
print 'invalid browser pid', pid
return
pid = pid[0]
content = self._read_from_api_proxy(fd)
if content is not None:
tabs = json.loads(content)
self._populate_tabs_icons(tabs)
self._update_tabs_callback(pid, tabs)
def _populate_tabs_icons(self, tabs):
for tab in tabs:
tab['icon'] = self.get_tab_icon(tab, fetch_if_missing=True)
def get_tab_icon(self, tab, fetch_if_missing=False):
icon = None
if 'favIconUrl' in tab and tab['favIconUrl'] is not None:
if tab['favIconUrl'] in self._icon_cache:
icon = self._icon_cache[tab['favIconUrl']]
else:
self._icon_cache[tab['favIconUrl']] = None
glib_wrappers.async_get_url(tab['favIconUrl'], self._tab_icon_ready)
return icon
def _tab_icon_ready(self, url, contents):
input_stream = Gio.MemoryInputStream.new_from_data(contents, None)
try:
pixbuf = Pixbuf.new_from_stream(input_stream, None)
except GLib.Error as ex:
if ex.message == 'Unrecognized image file format':
print "Error fetching icon for URL '%s': %s" % (url, ex.message)
return
self._icon_cache[url] = pixbuf
self._update_tab_icon_callback(url, contents)
@staticmethod
def _read_from_api_proxy(fd):
message = os.read(fd, 4096)
raw_length = message[:4]
if not raw_length:
return None
length = struct.unpack('@I', raw_length)[0]
payload = message[4:4 + length]
if len(payload) != length:
return None
return payload
def send_list_tabs_command(self, pid):
os.write(self._out_fds_by_browser_pid[pid], 'list_tabs;')
def _clean_stale_descriptors(self, active_browsers):
stale_pids = [browser.pid for browser in active_browsers if
browser.pid not in self._in_fds_by_browser_pid and
browser.pid not in self._out_fds_by_browser_pid]
for pid in stale_pids:
self._clean_fds_for_browser_pid(stale_pids, self._in_fds_by_browser_pid)
self._clean_fds_for_browser_pid(stale_pids, self._out_fds_by_browser_pid)
def _clean_fds_for_browser_pid(self, browser_pids, pid_to_fd):
stale_pids_in_dict = [pid for pid in browser_pids if pid in pid_to_fd]
for pid in stale_pids_in_dict:
fd = pid_to_fd[pid]
try:
os.close(fd)
except:
pass
del pid_to_fd[pid]