forked from levioctl/textual-switcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
glib_wrappers.py
57 lines (43 loc) · 1.76 KB
/
glib_wrappers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from gi.repository import GLib, Gio
def register_signal(callback, signal_type):
def register_signal():
GLib.idle_add(install_glib_handler, signal_type, priority=GLib.PRIORITY_HIGH)
def handler(*args):
signal_nr = args[0]
if signal_nr == signal_type:
callback()
register_signal()
def install_glib_handler(sig):
unix_signal_add = None
if hasattr(GLib, "unix_signal_add"):
unix_signal_add = GLib.unix_signal_add
elif hasattr(GLib, "unix_signal_add_full"):
unix_signal_add = GLib.unix_signal_add_full
if unix_signal_add is not None:
unix_signal_add(GLib.PRIORITY_HIGH, sig, handler, sig)
else:
print("Can't install GLib signal handler; gi version is too old.")
register_signal()
def async_run_subprocess(command):
pid, stdin, stdout, _ = \
GLib.spawn_async(command,
flags=GLib.SpawnFlags.SEARCH_PATH | GLib.SpawnFlags.DO_NOT_REAP_CHILD,
standard_output=True,
standard_error=True)
return stdout
def async_get_url(url, on_ready_callback):
cancellable = Gio.Cancellable()
def on_icon_ready_callback_wrapper(source_object, result, url):
try:
success, contents, etag = source_object.load_contents_finish(result)
except GLib.GError as e:
print("Error loading URL '%s': %s" % (url, e.message))
else:
if success:
on_ready_callback(url, contents)
else:
print("Error reading icon")
finally:
cancellable.reset()
file_ = Gio.File.new_for_uri(url)
file_.load_contents_async(cancellable, on_icon_ready_callback_wrapper, url)