Merge pull request #152 from quexten/feature/background-status

Add background status support
This commit is contained in:
Bernd Schoolmann 2024-04-05 08:08:15 +02:00 committed by GitHub
commit b9a2c1608d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 96 additions and 12 deletions

View File

@ -0,0 +1,20 @@
import os
import subprocess
is_flatpak = os.path.exists("/.flatpak-info")
def register_autostart(autostart: bool):
if is_flatpak:
print("Running in flatpak, registering with background portal for autostart.")
try:
subprocess.Popen(["python3", "/app/bin/src/linux/flatpak/autostart.py"], start_new_session=True)
except:
pass
def set_status(status: str):
if is_flatpak:
try:
subprocess.Popen(["python3", "/app/bin/src/linux/flatpak/status.py", status], start_new_session=True)
except:
pass

View File

@ -1,17 +1,16 @@
import gi
gi.require_version('Gtk', '4.0')
gi.require_version('Adw', '1')
import gc
from gi.repository import Gtk, Adw, GLib, Gio
from gi.repository import GLib, Gio
from random import randint
import time
import os
import sys
from threading import Timer
def receive_autostart(self, *args):
print("autostart enabled..!?")
print(args)
os._exit(0)
sys.exit(0)
def request_autostart():
bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)

View File

@ -0,0 +1,37 @@
"""
Script to set the status of the background process.
Run separately so that gtk dependencies don't stick around in memory.
"""
import gi
gi.require_version('Gtk', '4.0')
gi.require_version('Adw', '1')
from gi.repository import GLib, Gio
import sys
def set_status(message):
bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
proxy = Gio.DBusProxy.new_sync(
bus,
Gio.DBusProxyFlags.NONE,
None,
'org.freedesktop.portal.Desktop',
'/org/freedesktop/portal/desktop',
'org.freedesktop.portal.Background',
None,
)
options = {
'message': GLib.Variant('s', message),
}
try:
request = proxy.SetStatus('(a{sv})', options)
sys.exit(0)
except Exception as e:
print(e)
if len(sys.argv) > 1:
set_status(sys.argv[1])
loop = GLib.MainLoop()
loop.run()

View File

@ -4,6 +4,7 @@ import subprocess
from tendo import singleton
from .monitors import dbus_autofill_monitor
from .monitors import dbus_monitor
from .monitors import locked_monitor
import sys
from src.services import goldwarden
from src.services import pinentry
@ -12,6 +13,7 @@ import os
import secrets
import time
import os
import src.linux.flatpak.api as flatpak_api
root_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, os.pardir))
is_flatpak = os.path.exists("/.flatpak-info")
@ -37,6 +39,7 @@ def main():
# start daemons
dbus_autofill_monitor.run_daemon(token) # todo: remove after migration
dbus_monitor.run_daemon(token)
locked_monitor.run_daemon(token)
pinentry.daemonize()
if not "--hidden" in sys.argv:
@ -44,13 +47,7 @@ def main():
p.stdin.write(f"{token}\n".encode())
p.stdin.flush()
if is_flatpak:
# to autostart the appes
try:
print("Enabling autostart...")
subprocess.Popen(["python3", "-m", "src.linux.background"], cwd=root_path, start_new_session=True)
except Exception as e:
pass
flatpak_api.register_autostart(True)
while True:
time.sleep(60)

View File

@ -0,0 +1,30 @@
from gi.repository import Gtk
import dbus
import dbus.service
from dbus.mainloop.glib import DBusGMainLoop
from threading import Thread
import subprocess
import os
from src.services import goldwarden
import time
import src.linux.flatpak.api as flatpak_api
daemon_token = None
def daemon():
time.sleep(5)
goldwarden.create_authenticated_connection(daemon_token)
while True:
status = goldwarden.get_vault_status()
if status["locked"]:
flatpak_api.set_status("Locked")
else:
flatpak_api.set_status("Unlocked")
time.sleep(1)
def run_daemon(token):
print("running locked status daemon")
global daemon_token
daemon_token = token
thread = Thread(target=daemon)
thread.start()

View File

@ -37,6 +37,7 @@ if BINARY_PATH is None:
authenticated_connection = None
def create_authenticated_connection(token):
print("create authenticated connection")
global authenticated_connection
authenticated_connection = subprocess.Popen([f"{BINARY_PATH}", "session"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if not token == None: