1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
from collections.abc import Callable
from gi.repository import Astal, Gio
RequestHandler = Callable[[str, Callable[[str], None]], None]
class _Application(Astal.Application):
def __init__(self) -> None:
super().__init__()
self.request_handler: RequestHandler | None = None
def do_response(self, msg: str, conn: Gio.SocketConnection) -> None:
if self.request_handler:
self.request_handler(
msg,
lambda response: Astal.write_sock(
conn,
response,
lambda _, res: Astal.write_sock_finish(res),
),
)
else:
super().do_response(msg, conn)
def start(
self,
instance_name: str | None = None,
gtk_theme: str | None = None,
icon_theme: str | None = None,
cursor_theme: str | None = None,
css: str | None = None,
hold: bool | None = True,
request_handler: RequestHandler | None = None,
callback: Callable | None = None,
) -> None:
if request_handler:
self.request_handler = request_handler
if hold:
self.hold()
if instance_name:
self.instance_name = instance_name
if gtk_theme:
self.gtk_theme = gtk_theme
if icon_theme:
self.icon_theme = icon_theme
if cursor_theme:
self.cursor_theme = icon_theme
if css:
self.apply_css(css, False)
if not self.acquire_socket():
print(f"Astal instance {self.instance_name} already running")
return
def on_activate(app):
if callback:
callback()
self.connect("activate", on_activate)
self.run()
App = _Application()
|