SafeEyes/safeeyes/Utility.py

456 lines
14 KiB
Python

# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2017 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import gi
gi.require_version('Gdk', '3.0')
from gi.repository import Gdk, GLib
from html.parser import HTMLParser
from distutils.version import LooseVersion
from logging.handlers import RotatingFileHandler
import babel.dates, os, errno, re, subprocess, threading, logging, locale, json, shutil, pyaudio, wave
bin_directory = os.path.dirname(os.path.realpath(__file__))
home_directory = os.path.expanduser('~')
system_language_directory = os.path.join(bin_directory, 'config/lang')
config_directory = os.path.join(home_directory, '.config/safeeyes')
config_file_path = os.path.join(config_directory, 'safeeyes.json')
style_sheet_path = os.path.join(config_directory, 'style/safeeyes_style.css')
system_config_file_path = os.path.join(bin_directory, "config/safeeyes.json")
system_style_sheet_path = os.path.join(bin_directory, "config/style/safeeyes_style.css")
log_file_path = os.path.join(config_directory, 'safeeyes.log')
def play_notification():
"""
Play the alert.wav
"""
logging.info('Playing audible alert')
CHUNK = 1024
try:
# Open the sound file
path = get_resource_path('alert.wav')
if path is None:
return
sound = wave.open(path, 'rb')
# Create a sound stream
wrapper = pyaudio.PyAudio()
stream = wrapper.open(format=wrapper.get_format_from_width(sound.getsampwidth()),
channels=sound.getnchannels(),
rate=sound.getframerate(),
output=True)
# Write file data into the sound stream
data = sound.readframes(CHUNK)
while data != b'':
stream.write(data)
data = sound.readframes(CHUNK)
# Close steam
stream.stop_stream()
stream.close()
sound.close()
wrapper.terminate()
except Exception as e:
logging.warning('Unable to play audible alert')
logging.exception(e)
def get_resource_path(resource_name):
"""
Return the user-defined resource if a system resource is overridden by the user.
Otherwise, return the system resource. Return None if the specified resource does not exist.
"""
if resource_name is None:
return None
resource_location = os.path.join(config_directory, 'resource', resource_name)
if not os.path.isfile(resource_location):
resource_location = os.path.join(bin_directory, 'resource', resource_name)
if not os.path.isfile(resource_location):
logging.error('Resource not found: ' + resource_name)
resource_location = None
return resource_location
def system_idle_time():
"""
Get system idle time in minutes.
Return the idle time if xprintidle is available, otherwise return 0.
"""
try:
return int(subprocess.check_output(['xprintidle']).decode('utf-8')) / 60000 # Convert to minutes
except:
return 0
def start_thread(target_function, **args):
"""
Execute the function in a separate thread.
"""
thread = threading.Thread(target=target_function, kwargs=args)
thread.start()
def execute_main_thread(target_function, args=None):
"""
Execute the given function in main thread.
"""
if args:
GLib.idle_add(lambda: target_function(args))
else:
GLib.idle_add(lambda: target_function())
def is_active_window_skipped(skip_break_window_classes, take_break_window_classes, unfullscreen_allowed=False):
"""
Check for full-screen applications.
This method must be executed by the main thread. If not, it will cause to random failure.
"""
logging.info('Searching for full-screen application')
screen = Gdk.Screen.get_default()
active_window = screen.get_active_window()
if active_window:
active_xid = str(active_window.get_xid())
cmdlist = ['xprop', '-root', '-notype','-id',active_xid, 'WM_CLASS', '_NET_WM_STATE']
try:
stdout = subprocess.check_output(cmdlist).decode('utf-8')
except subprocess.CalledProcessError:
logging.warning('Error in finding full-screen application')
pass
else:
if stdout:
is_fullscreen = 'FULLSCREEN' in stdout
# Extract the process name
process_names = re.findall('"(.+?)"', stdout)
if process_names:
process = process_names[1].lower()
if process in skip_break_window_classes:
return True
elif process in take_break_window_classes:
if is_fullscreen and unfullscreen_allowed:
try:
active_window.unfullscreen()
except:
logging.error('Error in unfullscreen the window ' + process)
pass
return False
return is_fullscreen
return False
def __system_locale():
"""
Return the system locale. If not available, return en_US.UTF-8.
"""
locale.setlocale(locale.LC_ALL, '')
system_locale = locale.getlocale(locale.LC_TIME)[0]
if not system_locale:
system_locale = 'en_US.UTF-8'
return system_locale
def format_time(time):
"""
Format time based on the system time.
"""
system_locale = __system_locale()
return babel.dates.format_time(time, format='short', locale=system_locale)
def mkdir(path):
"""
Create directory if not exists.
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
logging.error('Error while creating ' + str(path))
raise
def parse_language_code(lang_code):
"""
Convert the user defined language code to a valid one.
This includes converting to lower case and finding system locale language,
if the given lang_code code is 'system'.
"""
# Convert to lower case
lang_code = str(lang_code).lower()
# If it is system, use the system language
if lang_code == 'system':
logging.info('Use system language for Safe Eyes')
system_locale = __system_locale()
lang_code = system_locale[0:2].lower()
# Check whether translation is available for this language.
# If not available, use English by default.
language_file_path = os.path.join(system_language_directory, lang_code + '.json')
if not os.path.exists(language_file_path):
logging.warn('The language {} does not exist. Use English instead'.format(lang_code))
lang_code = 'en'
return lang_code
def load_language(lang_code):
"""
Load the desired language from the available list based on the preference.
"""
# Convert the user defined language code to a valid one
lang_code = parse_language_code(lang_code)
# Construct the translation file path
language_file_path = os.path.join(system_language_directory, lang_code + '.json')
language = None
# Read the language file and construct the json object
with open(language_file_path) as language_file:
language = json.load(language_file)
return language
def read_lang_files():
"""
Read all the language translations and build a key-value mapping of language names
in English and ISO 639-1 (Filename without extension).
"""
languages = {}
for lang_file_name in os.listdir(system_language_directory):
lang_file_path = os.path.join(system_language_directory, lang_file_name)
if os.path.isfile(lang_file_path):
with open(lang_file_path) as lang_file:
lang = json.load(lang_file)
languages[lang_file_name.lower().replace('.json', '')] = lang['meta_info']['language_name']
return languages
def lock_screen_command():
"""
Function tries to detect the screensaver command based on the current envinroment
Possible results:
Gnome, Unity, Budgie: ['gnome-screensaver-command', '--lock']
Cinnamon: ['cinnamon-screensaver-command', '--lock']
Pantheon, LXDE: ['light-locker-command', '--lock']
Mate: ['mate-screensaver-command', '--lock']
KDE: ['qdbus', 'org.freedesktop.ScreenSaver', '/ScreenSaver', 'Lock']
XFCE: ['xflock4']
Otherwise: None
"""
desktop_session = os.environ.get('DESKTOP_SESSION')
current_desktop = os.environ.get('XDG_CURRENT_DESKTOP')
if desktop_session is not None:
desktop_session = desktop_session.lower()
if ('xfce' in desktop_session or desktop_session.startswith('xubuntu') or (current_desktop is not None and 'xfce' in current_desktop)) and command_exist('xflock4'):
return ['xflock4']
elif desktop_session == 'cinnamon' and command_exist('cinnamon-screensaver-command'):
return ['cinnamon-screensaver-command', '--lock']
elif (desktop_session == 'pantheon' or desktop_session.startswith('lubuntu')) and command_exist('light-locker-command'):
return ['light-locker-command', '--lock']
elif desktop_session == 'mate' and command_exist('mate-screensaver-command'):
return ['mate-screensaver-command', '--lock']
elif desktop_session == 'kde' or 'plasma' in desktop_session or desktop_session.startswith('kubuntu') or os.environ.get('KDE_FULL_SESSION') == 'true':
return ['qdbus', 'org.freedesktop.ScreenSaver', '/ScreenSaver', 'Lock']
elif desktop_session in ['gnome','unity', 'budgie-desktop'] or desktop_session.startswith('ubuntu'):
if command_exist('gnome-screensaver-command'):
return ['gnome-screensaver-command', '--lock']
else:
# From Gnome 3.8 no gnome-screensaver-command
return ['dbus-send', '--type=method_call', '--dest=org.gnome.ScreenSaver', '/org/gnome/ScreenSaver', 'org.gnome.ScreenSaver.Lock']
elif os.environ.get('GNOME_DESKTOP_SESSION_ID'):
if not 'deprecated' in os.environ.get('GNOME_DESKTOP_SESSION_ID') and command_exist('gnome-screensaver-command'):
# Gnome 2
return ['gnome-screensaver-command', '--lock']
return None
def lock_desktop(command):
"""
Lock the screen using the predefined commands
"""
if command:
try:
subprocess.Popen(command)
except Exception as e:
logging.error('Error in executing the commad' + str(command) + ' to lock screen')
def html_to_text(html):
"""
Convert HTML to plain text
"""
extractor = __HTMLTextExtractor()
extractor.feed(html)
return extractor.get_data()
def command_exist(command):
"""
Check whether the given command exist in the system or not.
"""
if shutil.which(command):
return True
else:
return False
def merge_configs(new_config, old_config):
"""
Merge the values of old_config into the new_config.
"""
new_config = new_config.copy()
new_config.update(old_config)
return new_config
def __initialize_safeeyes():
"""
Create the config file and style sheet in ~/.config/safeeyes directory.
"""
logging.info('Copy the config files to ~/.config/safeeyes')
style_dir_path = os.path.join(home_directory, '.config/safeeyes/style')
startup_dir_path = os.path.join(home_directory, '.config/autostart')
# Remove the ~/.config/safeeyes directory
shutil.rmtree(config_directory, ignore_errors=True)
# Remove the startup file
try:
os.remove(os.path.join(home_directory, os.path.join(startup_dir_path, 'safeeyes.desktop')))
except:
pass
# Create the ~/.config/safeeyes/style directory
mkdir(style_dir_path)
mkdir(startup_dir_path)
# Copy the safeeyes.json
shutil.copy2(system_config_file_path, config_file_path)
# Copy the new startup file
try:
os.symlink("/usr/share/applications/safeeyes.desktop", os.path.join(startup_dir_path, 'safeeyes.desktop'))
except OSError as exc:
pass
# Copy the new style sheet
if not os.path.isfile(style_sheet_path):
shutil.copy2(system_style_sheet_path, style_sheet_path)
def intialize_logging():
"""
Initialize the logging framework using the Safe Eyes specific configurations.
"""
# Create the directory to store log file if not exist
if not os.path.exists(config_directory):
try:
os.makedirs(config_directory)
except:
pass
# Configure logging.
log_formatter = logging.Formatter('%(asctime)s [%(levelname)s]:[%(threadName)s] %(message)s')
# Apped the logs and overwrite once reached 5MB
handler = RotatingFileHandler(log_file_path, mode='a', maxBytes=5*1024*1024, backupCount=2, encoding=None, delay=0)
handler.setFormatter(log_formatter)
handler.setLevel(logging.INFO)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
def read_config():
"""
Read the configuration from the config directory.
If does not exist or outdated by major version, copy the system config and
startup script to user directory.
If the user config is outdated by minor version, update the config by the new values.
"""
logging.info('Reading the configuration file')
if not os.path.isfile(config_file_path):
logging.info('Safe Eyes configuration file not found')
__initialize_safeeyes()
# Read the configurations
with open(config_file_path) as config_file:
user_config = json.load(config_file)
with open(system_config_file_path) as config_file:
system_config = json.load(config_file)
user_config_version = str(user_config['meta']['config_version'])
system_config_version = str(system_config['meta']['config_version'])
if LooseVersion(user_config_version) < LooseVersion(system_config_version):
# Outdated user config
logging.info('Update the old config version {} with new config version {}'.format(user_config_version, system_config_version))
user_config_major_version = user_config_version.split('.')[0]
system_config_major_version = system_config_version.split('.')[0]
if LooseVersion(user_config_major_version) < LooseVersion(system_config_major_version):
# Major version change
__initialize_safeeyes()
# Update the user_config
user_config = system_config
else:
# Minor version change
new_config = system_config.copy()
new_config.update(user_config)
# Update the version
new_config['meta']['config_version'] = system_config_version
# Write the configuration to file
with open(config_file_path, 'w') as config_file:
json.dump(new_config, config_file, indent=4, sort_keys=True)
# Update the user_config
user_config = new_config
return user_config
class __HTMLTextExtractor(HTMLParser):
"""
Helper class to convert HTML to text
"""
def __init__(self):
self.reset()
self.strict = False
self.convert_charrefs= True
self.fed = []
def handle_data(self, d):
self.fed.append(d)
def get_data(self):
return ''.join(self.fed)