SafeEyes/safeeyes/utility.py

670 lines
22 KiB
Python
Raw Normal View History

2017-10-07 15:10:31 +02:00
#!/usr/bin/env python
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2017 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2017-10-07 15:10:31 +02:00
"""
This module contains utility functions for Safe Eyes and its plugins.
"""
import errno
import imp
2020-04-15 23:52:14 +02:00
import inspect
import importlib
2017-10-07 15:10:31 +02:00
import json
import locale
import logging
import os
import re
import sys
2017-10-07 15:10:31 +02:00
import shutil
import subprocess
import threading
from distutils.version import LooseVersion
from logging.handlers import RotatingFileHandler
2020-12-26 01:22:57 +01:00
from pathlib import Path
import babel.core
2017-10-07 15:10:31 +02:00
import babel.dates
2017-02-06 03:09:04 +01:00
import gi
2017-10-07 15:10:31 +02:00
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
2019-01-17 00:15:31 +01:00
from gi.repository import GdkPixbuf
2017-10-07 15:10:31 +02:00
2017-02-06 03:09:04 +01:00
gi.require_version('Gdk', '3.0')
2017-10-07 15:10:31 +02:00
BIN_DIRECTORY = os.path.dirname(os.path.realpath(__file__))
2017-10-14 16:03:39 +02:00
HOME_DIRECTORY = os.environ.get('HOME') or os.path.expanduser('~')
2018-06-23 00:22:14 +02:00
CONFIG_DIRECTORY = os.path.join(os.environ.get(
'XDG_CONFIG_HOME') or os.path.join(HOME_DIRECTORY, '.config'), 'safeeyes')
2017-10-07 15:10:31 +02:00
CONFIG_FILE_PATH = os.path.join(CONFIG_DIRECTORY, 'safeeyes.json')
CONFIG_RESOURCE = os.path.join(CONFIG_DIRECTORY, 'resource')
SESSION_FILE_PATH = os.path.join(CONFIG_DIRECTORY, 'session.json')
STYLE_SHEET_PATH = os.path.join(CONFIG_DIRECTORY, 'style/safeeyes_style.css')
SYSTEM_CONFIG_FILE_PATH = os.path.join(BIN_DIRECTORY, "config/safeeyes.json")
2018-06-23 00:22:14 +02:00
SYSTEM_STYLE_SHEET_PATH = os.path.join(
BIN_DIRECTORY, "config/style/safeeyes_style.css")
2017-12-07 00:45:20 +01:00
LOG_FILE_PATH = os.path.join(HOME_DIRECTORY, 'safeeyes.log')
2017-10-07 15:10:31 +02:00
SYSTEM_PLUGINS_DIR = os.path.join(BIN_DIRECTORY, 'plugins')
USER_PLUGINS_DIR = os.path.join(CONFIG_DIRECTORY, 'plugins')
LOCALE_PATH = os.path.join(BIN_DIRECTORY, 'config/locale')
2020-12-26 01:22:57 +01:00
SYSTEM_DESKTOP_FILE = os.path.join(BIN_DIRECTORY, "platform/safeeyes.desktop")
SYSTEM_ICONS = os.path.join(BIN_DIRECTORY, "platform/icons")
2017-10-07 15:10:31 +02:00
DESKTOP_ENVIRONMENT = None
IS_WAYLAND = False
2017-02-06 03:09:04 +01:00
2019-01-17 00:15:31 +01:00
2017-04-09 02:29:51 +02:00
def get_resource_path(resource_name):
2017-10-07 15:10:31 +02:00
"""
Return the user-defined resource if a system resource is overridden by the user.
Otherwise, return the system resource. Return None if the specified resource does not exist.
"""
if resource_name is None:
return None
resource_location = os.path.join(CONFIG_RESOURCE, resource_name)
if not os.path.isfile(resource_location):
2018-06-23 00:22:14 +02:00
resource_location = os.path.join(
BIN_DIRECTORY, 'resource', resource_name)
2017-10-07 15:10:31 +02:00
if not os.path.isfile(resource_location):
# Resource not found
resource_location = None
return resource_location
2017-02-06 03:09:04 +01:00
2017-02-16 00:29:14 +01:00
def start_thread(target_function, **args):
2017-10-07 15:10:31 +02:00
"""
Execute the function in a separate thread.
"""
thread = threading.Thread(target=target_function, name="WorkThread", daemon=False, kwargs=args)
2017-10-07 15:10:31 +02:00
thread.start()
2017-02-06 03:09:04 +01:00
# def execute_main_thread(target_function, args=None):
# """
# Execute the given function in main thread.
# """
# if args:
# GLib.idle_add(lambda: target_function(args))
# else:
# GLib.idle_add(target_function)
def execute_main_thread(target_function, arg1=None, arg2=None):
2017-10-07 15:10:31 +02:00
"""
Execute the given function in main thread.
"""
if arg1 is not None and arg2 is not None:
GLib.idle_add(lambda: target_function(arg1, arg2))
elif arg1 is not None:
GLib.idle_add(lambda: target_function(arg1))
elif arg2 is not None:
GLib.idle_add(lambda: target_function(arg2))
2017-10-07 15:10:31 +02:00
else:
GLib.idle_add(target_function)
def system_locale(category=locale.LC_MESSAGES):
2017-10-07 15:10:31 +02:00
"""
Return the system locale. If not available, return en_US.UTF-8.
"""
try:
locale.setlocale(locale.LC_ALL, '')
sys_locale = locale.getlocale(category)[0]
2017-10-07 15:10:31 +02:00
if not sys_locale:
sys_locale = 'en_US.UTF-8'
return sys_locale
except BaseException:
# Some systems does not return proper locale
return 'en_US.UTF-8'
2017-03-13 14:30:44 +01:00
def format_time(time):
2017-10-07 15:10:31 +02:00
"""
Format time based on the system time.
"""
sys_locale = system_locale(locale.LC_TIME)
try:
return babel.dates.format_time(time, format='short', locale=sys_locale)
except babel.core.UnknownLocaleError:
# Some locale types are not supported by the babel library.
# Use 'en' locale format if the system locale is not supported.
return babel.dates.format_time(time, format='short', locale='en')
2017-02-10 02:45:49 +01:00
def mkdir(path):
2017-10-07 15:10:31 +02:00
"""
Create directory if not exists.
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
logging.error('Error while creating ' + str(path))
raise
def load_json(json_path):
"""
Load the JSON file from the given path.
"""
json_obj = None
if os.path.isfile(json_path):
try:
with open(json_path) as config_file:
json_obj = json.load(config_file)
except BaseException:
pass
return json_obj
def write_json(json_path, json_obj):
"""
Write the JSON object at the given path
"""
try:
with open(json_path, 'w') as json_file:
json.dump(json_obj, json_file, indent=4, sort_keys=True)
except BaseException:
pass
def delete(file_path):
"""
Delete the given file or directory
"""
try:
os.remove(file_path)
except OSError:
pass
def check_plugin_dependencies(plugin_id, plugin_config, plugin_settings, plugin_path):
2017-10-07 15:10:31 +02:00
"""
Check the plugin dependencies.
"""
# Check the desktop environment
if plugin_config['dependencies']['desktop_environments']:
# Plugin has restrictions on desktop environments
if DESKTOP_ENVIRONMENT not in plugin_config['dependencies']['desktop_environments']:
return _('Plugin does not support %s desktop environment') % DESKTOP_ENVIRONMENT
# Check the Python modules
for module in plugin_config['dependencies']['python_modules']:
if not module_exist(module):
return _("Please install the Python module '%s'") % module
# Check the shell commands
for command in plugin_config['dependencies']['shell_commands']:
if not command_exist(command):
return _("Please install the command-line tool '%s'") % command
# Check the resources
for resource in plugin_config['dependencies']['resources']:
if get_resource_path(resource) is None:
return _('Please add the resource %(resource)s to %(config_resource)s directory') % {'resource': resource, 'config_resource': CONFIG_RESOURCE}
plugin_dependency_checker = os.path.join(plugin_path, 'dependency_checker.py')
if os.path.isfile(plugin_dependency_checker):
dependency_checker = importlib.import_module((plugin_id + '.dependency_checker'))
if dependency_checker and hasattr(dependency_checker, "validate"):
return dependency_checker.validate(plugin_config, plugin_settings)
2017-10-07 15:10:31 +02:00
return None
def load_plugins_config(safeeyes_config):
"""
Load all the plugins from the given directory.
"""
configs = []
for plugin in safeeyes_config.get('plugins'):
plugin_path = os.path.join(SYSTEM_PLUGINS_DIR, plugin['id'])
if not os.path.isdir(plugin_path):
# User plugin
plugin_path = os.path.join(USER_PLUGINS_DIR, plugin['id'])
plugin_config_path = os.path.join(plugin_path, 'config.json')
plugin_icon_path = os.path.join(plugin_path, 'icon.png')
plugin_module_path = os.path.join(plugin_path, 'plugin.py')
if not os.path.isfile(plugin_module_path):
return
icon = None
if os.path.isfile(plugin_icon_path):
icon = plugin_icon_path
else:
icon = get_resource_path('ic_plugin.png')
config = load_json(plugin_config_path)
if config is None:
continue
dependency_description = check_plugin_dependencies(plugin['id'], config, plugin.get('settings', {}), plugin_path)
2017-10-07 15:10:31 +02:00
if dependency_description:
plugin['enabled'] = False
config['error'] = True
config['meta']['description'] = dependency_description
icon = get_resource_path('ic_warning.png')
else:
config['error'] = False
config['id'] = plugin['id']
config['icon'] = icon
config['enabled'] = plugin['enabled']
for setting in config['settings']:
setting['safeeyes_config'] = plugin['settings']
configs.append(config)
return configs
2017-04-08 15:30:44 +02:00
2017-06-19 17:23:28 +02:00
def desktop_environment():
2017-10-07 15:10:31 +02:00
"""
Detect the desktop environment.
"""
global DESKTOP_ENVIRONMENT
desktop_session = os.environ.get('DESKTOP_SESSION')
current_desktop = os.environ.get('XDG_CURRENT_DESKTOP')
env = 'unknown'
if desktop_session is not None:
desktop_session = desktop_session.lower()
if desktop_session in ['gnome', 'unity', 'budgie-desktop', 'cinnamon', 'mate', 'xfce4', 'lxde', 'pantheon', 'fluxbox', 'blackbox', 'openbox', 'icewm', 'jwm', 'afterstep', 'trinity', 'kde']:
env = desktop_session
elif desktop_session.startswith('xubuntu') or (current_desktop is not None and 'xfce' in current_desktop):
env = 'xfce'
elif desktop_session.startswith('lubuntu'):
env = 'lxde'
elif 'plasma' in desktop_session or desktop_session.startswith('kubuntu') or os.environ.get('KDE_FULL_SESSION') == 'true':
env = 'kde'
elif os.environ.get('GNOME_DESKTOP_SESSION_ID'):
env = 'gnome'
elif desktop_session.startswith('ubuntu'):
env = 'unity'
DESKTOP_ENVIRONMENT = env
return env
def is_wayland():
"""
Determine if Wayland is running
https://unix.stackexchange.com/a/325972/222290
"""
global IS_WAYLAND
try:
session_id = subprocess.check_output(['loginctl']).split(b'\n')[1].split()[0]
output = subprocess.check_output(
['loginctl', 'show-session', session_id, '-p', 'Type']
)
except BaseException:
logging.warning('Unable to determine if wayland is running. Assuming no.')
IS_WAYLAND = False
else:
IS_WAYLAND = bool(re.search(b'wayland', output, re.IGNORECASE))
return IS_WAYLAND
2017-10-07 15:10:31 +02:00
def execute_command(command, args=[]):
"""
Execute the shell command without waiting for its response.
"""
if command:
command_to_execute = []
if isinstance(command, str):
command_to_execute.append(command)
else:
command_to_execute.extend(command)
if args:
command_to_execute.extend(args)
try:
subprocess.Popen(command_to_execute)
except BaseException:
logging.error('Error in executing the command ' + str(command))
2017-04-19 01:34:22 +02:00
def command_exist(command):
2017-10-07 15:10:31 +02:00
"""
Check whether the given command exist in the system or not.
"""
if shutil.which(command):
return True
return False
def module_exist(module):
"""
Check wther the given Python module exists or not.
"""
try:
imp.find_module(module)
return True
except ImportError:
return False
def merge_configs(new_config, old_config):
2017-10-07 15:10:31 +02:00
"""
Merge the values of old_config into the new_config.
"""
new_config = new_config.copy()
new_config.update(old_config)
return new_config
def initialize_safeeyes():
"""
Create the config file and style sheet in ~/.config/safeeyes directory.
"""
logging.info('Copy the config files to ~/.config/safeeyes')
style_dir_path = os.path.join(HOME_DIRECTORY, '.config/safeeyes/style')
2020-12-26 01:22:57 +01:00
# Remove the ~/.config/safeeyes/safeeyes.json file
delete(CONFIG_FILE_PATH)
2017-10-07 15:10:31 +02:00
# Create the ~/.config/safeeyes/style directory
mkdir(style_dir_path)
# Copy the safeeyes.json
shutil.copy2(SYSTEM_CONFIG_FILE_PATH, CONFIG_FILE_PATH)
os.chmod(CONFIG_FILE_PATH, 0o777)
2017-10-07 15:10:31 +02:00
# Copy the new style sheet
if not os.path.isfile(STYLE_SHEET_PATH):
shutil.copy2(SYSTEM_STYLE_SHEET_PATH, STYLE_SHEET_PATH)
os.chmod(STYLE_SHEET_PATH, 0o777)
2017-10-07 15:10:31 +02:00
create_startup_entry()
def create_startup_entry():
"""
Create start up entry.
"""
startup_dir_path = os.path.join(HOME_DIRECTORY, '.config/autostart')
startup_entry = os.path.join(startup_dir_path, 'safeeyes.desktop')
# Create the folder if not exist
mkdir(startup_dir_path)
# Remove existing files
delete(startup_entry)
# Create the new startup entry
try:
os.symlink(SYSTEM_DESKTOP_FILE, startup_entry)
except OSError:
logging.error("Failed to create startup entry at %s" % startup_entry)
2020-12-26 01:22:57 +01:00
def initialize_platform():
"""
Copy icons and generate desktop entries.
"""
logging.debug("Initialize the platform")
applications_dir_path = os.path.join(HOME_DIRECTORY, '.local/share/applications')
icons_dir_path = os.path.join(HOME_DIRECTORY, '.local/share/icons')
desktop_entry = os.path.join(applications_dir_path, 'safeeyes.desktop')
# Create the folder if not exist
2020-12-26 01:22:57 +01:00
mkdir(icons_dir_path)
# Create a desktop entry
if not os.path.exists(os.path.join(sys.prefix, "share/applications/safeeyes.desktop")):
# Create the folder if not exist
mkdir(applications_dir_path)
# Remove existing file
delete(desktop_entry)
# Create a link
try:
os.symlink(SYSTEM_DESKTOP_FILE, desktop_entry)
except OSError:
logging.error("Failed to create desktop entry at %s" % desktop_entry)
2020-12-26 01:22:57 +01:00
# Add links for all icons
for (path, _, filenames) in os.walk(SYSTEM_ICONS):
for filename in filenames:
system_icon = os.path.join(path, filename)
local_icon = os.path.join(icons_dir_path, os.path.relpath(system_icon, SYSTEM_ICONS))
global_icon = os.path.join(sys.prefix, "share/icons", os.path.relpath(system_icon, SYSTEM_ICONS))
2020-12-26 01:22:57 +01:00
parent_dir = str(Path(local_icon).parent)
if os.path.exists(global_icon):
# This icon is already added to the /usr/share/icons/hicolor folder
continue
2020-12-26 01:22:57 +01:00
# Create the directory if not exists
mkdir(parent_dir)
# Remove the link if already exists
delete(local_icon)
# Add a link for the icon
try:
os.symlink(system_icon, local_icon)
except OSError:
logging.error("Failed to create icon link at %s" % local_icon)
2017-10-07 15:10:31 +02:00
def reset_config():
# Remove the ~/.config/safeeyes/safeeyes.json and safeeyes_style.css
delete(CONFIG_FILE_PATH)
delete(STYLE_SHEET_PATH)
# Copy the safeeyes.json and safeeyes_style.css
shutil.copy2(SYSTEM_CONFIG_FILE_PATH, CONFIG_FILE_PATH)
shutil.copy2(SYSTEM_STYLE_SHEET_PATH, STYLE_SHEET_PATH)
# Add write permission (e.g. if original file was stored in /nix/store)
os.chmod(CONFIG_FILE_PATH, 0o777)
os.chmod(STYLE_SHEET_PATH, 0o777)
create_startup_entry()
2020-12-26 01:22:57 +01:00
2019-01-19 18:25:19 +01:00
def replace_style_sheet():
"""
Replace the user style sheet by system style sheet.
"""
delete(STYLE_SHEET_PATH)
shutil.copy2(SYSTEM_STYLE_SHEET_PATH, STYLE_SHEET_PATH)
os.chmod(STYLE_SHEET_PATH, 0o777)
2019-01-19 18:25:19 +01:00
2017-10-07 15:10:31 +02:00
def intialize_logging(debug):
"""
Initialize the logging framework using the Safe Eyes specific configurations.
"""
# Configure logging.
root_logger = logging.getLogger()
2018-06-23 00:22:14 +02:00
log_formatter = logging.Formatter(
'%(asctime)s [%(levelname)s]:[%(threadName)s] %(message)s')
2017-10-07 15:10:31 +02:00
2017-12-07 00:45:20 +01:00
# Append the logs and overwrite once reached 1MB
2017-10-07 15:10:31 +02:00
if debug:
2017-12-07 00:45:20 +01:00
# Log to file
2018-06-23 00:22:14 +02:00
file_handler = RotatingFileHandler(
LOG_FILE_PATH, maxBytes=1024 * 1024, backupCount=5, encoding=None, delay=0)
2017-12-07 00:45:20 +01:00
file_handler.setFormatter(log_formatter)
# Log to console
2017-10-07 15:10:31 +02:00
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
2017-12-07 00:45:20 +01:00
root_logger.setLevel(logging.DEBUG)
2017-10-07 15:10:31 +02:00
root_logger.addHandler(console_handler)
2017-12-07 00:45:20 +01:00
root_logger.addHandler(file_handler)
2017-10-07 15:10:31 +02:00
else:
2017-12-07 00:45:20 +01:00
root_logger.propagate = False
2017-10-07 15:10:31 +02:00
def __open_plugin_config(plugins_dir, plugin_id):
"""
Open the given plugin's configuration.
"""
plugin_config_path = os.path.join(plugins_dir, plugin_id, 'config.json')
plugin_module_path = os.path.join(plugins_dir, plugin_id, 'plugin.py')
if not os.path.isfile(plugin_config_path) or not os.path.isfile(plugin_module_path):
# Either the config.json or plugin.py is not available
return None
return load_json(plugin_config_path)
def __update_plugin_config(plugin, plugin_config, config):
"""
Update the plugin configuration.
"""
if plugin_config is None:
config['plugins'].remove(plugin)
else:
2017-10-12 15:33:44 +02:00
if LooseVersion(plugin.get('version', '0.0.0')) != LooseVersion(plugin_config['meta']['version']):
2017-10-07 15:10:31 +02:00
# Update the configuration
plugin['version'] = plugin_config['meta']['version']
setting_ids = []
# Add the new settings
for setting in plugin_config['settings']:
setting_ids.append(setting['id'])
2018-06-23 00:22:14 +02:00
if 'settings' not in plugin:
plugin['settings'] = {}
2017-10-07 15:10:31 +02:00
if plugin['settings'].get(setting['id'], None) is None:
plugin['settings'][setting['id']] = setting['default']
# Remove the removed ids
keys_to_remove = []
2017-10-12 15:44:34 +02:00
for key in plugin.get('settings', []):
2017-10-07 15:10:31 +02:00
if key not in setting_ids:
keys_to_remove.append(key)
for key in keys_to_remove:
del plugin['settings'][key]
def __add_plugin_config(plugin_id, plugin_config, safe_eyes_config):
"""
"""
if plugin_config is None:
return
config = {}
config['id'] = plugin_id
config['enabled'] = False # By default plugins are disabled
config['version'] = plugin_config['meta']['version']
if plugin_config['settings']:
config['settings'] = {}
for setting in plugin_config['settings']:
config['settings'][setting['id']] = setting['default']
safe_eyes_config['plugins'].append(config)
def merge_plugins(config):
"""
Merge plugin configurations with Safe Eyes configuration.
"""
system_plugins = None
user_plugins = None
# Load system plugins id
if os.path.isdir(SYSTEM_PLUGINS_DIR):
system_plugins = os.listdir(SYSTEM_PLUGINS_DIR)
else:
system_plugins = []
# Load user plugins id
if os.path.isdir(USER_PLUGINS_DIR):
user_plugins = os.listdir(USER_PLUGINS_DIR)
else:
user_plugins = []
# Create a list of existing plugins
for plugin in config['plugins']:
plugin_id = plugin['id']
if plugin_id in system_plugins:
plugin_config = __open_plugin_config(SYSTEM_PLUGINS_DIR, plugin_id)
__update_plugin_config(plugin, plugin_config, config)
system_plugins.remove(plugin_id)
elif plugin_id in user_plugins:
plugin_config = __open_plugin_config(USER_PLUGINS_DIR, plugin_id)
__update_plugin_config(plugin, plugin_config, config)
user_plugins.remove(plugin_id)
else:
config['plugins'].remove(plugin)
# Add all system plugins
for plugin_id in system_plugins:
plugin_config = __open_plugin_config(SYSTEM_PLUGINS_DIR, plugin_id)
__add_plugin_config(plugin_id, plugin_config, config)
# Add all user plugins
for plugin_id in user_plugins:
plugin_config = __open_plugin_config(USER_PLUGINS_DIR, plugin_id)
__add_plugin_config(plugin_id, plugin_config, config)
def open_session():
"""
Open the last session.
"""
logging.info('Reading the session file')
session = load_json(SESSION_FILE_PATH)
if session is None:
session = {'plugin': {}}
return session
def create_gtk_builder(glade_file):
"""
Create a Gtk builder and load the glade file.
"""
builder = Gtk.Builder()
builder.set_translation_domain('safeeyes')
builder.add_from_file(glade_file)
# Tranlslate all sub components
for obj in builder.get_objects():
if (not isinstance(obj, Gtk.SeparatorMenuItem)) and hasattr(obj, "get_label"):
label = obj.get_label()
if label is not None:
obj.set_label(_(label))
elif hasattr(obj, "get_title"):
title = obj.get_title()
if title is not None:
obj.set_title(_(title))
return builder
2019-01-17 00:15:31 +01:00
def load_and_scale_image(path, width, height):
if not os.path.isfile(path):
return None
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
filename=path,
width=width,
height=height,
preserve_aspect_ratio=True)
image = Gtk.Image.new_from_pixbuf(pixbuf)
return image
2020-04-15 23:52:14 +02:00
def has_method(module, method_name, no_of_args=0):
"""
Check whether the given function is defined in the module or not.
"""
if hasattr(module, method_name):
if len(inspect.getargspec(getattr(module, method_name)).args) == no_of_args:
return True
return False
def remove_if_exists(list_of_items, item):
"""
Remove the item from the list_of_items it it exists.
"""
if item in list_of_items:
list_of_items.remove(item)