Refactor the api and plugins

This commit is contained in:
Gobinath 2021-06-07 08:38:26 -04:00
parent c1b111d55c
commit cb69f3e1c9
28 changed files with 1358 additions and 1448 deletions

View File

@ -1 +1,8 @@
import os
SAFE_EYES_VERSION = "3.0.0"
SAFE_EYES_HOME_DIR = os.path.dirname(os.path.realpath(__file__))
SAFE_EYES_CONFIG_DIR = os.path.join(
os.environ.get('XDG_CONFIG_HOME') or os.path.join((os.environ.get('HOME') or os.path.expanduser('~')), '.config'),
'safeeyes')

View File

@ -29,11 +29,12 @@ from threading import Timer
import gi
import psutil
from safeeyes import SAFE_EYES_VERSION
from safeeyes import utility
from safeeyes.config import Config
from safeeyes.safeeyes import SafeEyes
from safeeyes import SAFE_EYES_VERSION
from safeeyes.rpc import RPCClient
from safeeyes.safeeyes import SafeEyes
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
@ -57,7 +58,8 @@ def __running():
else:
# In older versions cmdline was a list object
cmd_line = proc.cmdline
if ('python3' in cmd_line[0] or 'python' in cmd_line[0]) and ('safeeyes' in cmd_line[1] or 'safeeyes' in cmd_line):
if ('python3' in cmd_line[0] or 'python' in cmd_line[0]) and (
'safeeyes' in cmd_line[1] or 'safeeyes' in cmd_line):
process_count += 1
if process_count > 1:
return True
@ -72,23 +74,25 @@ def __evaluate_arguments(args, safe_eyes):
"""
Evaluate the arguments and execute the operations.
"""
if args.about:
utility.execute_main_thread(safe_eyes.show_about)
elif args.disable:
utility.execute_main_thread(safe_eyes.disable_safeeyes)
elif args.enable:
utility.execute_main_thread(safe_eyes.enable_safeeyes)
elif args.settings:
utility.execute_main_thread(safe_eyes.show_settings)
elif args.take_break:
utility.execute_main_thread(safe_eyes.take_break)
# if args.about:
# utility.execute_main_thread(safe_eyes.show_about)
# elif args.disable:
# utility.execute_main_thread(safe_eyes.disable_safeeyes)
# elif args.enable:
# utility.execute_main_thread(safe_eyes.enable_safeeyes)
# elif args.settings:
# utility.execute_main_thread(safe_eyes.show_settings)
# elif args.take_break:
# utility.execute_main_thread(safe_eyes.take_break)
pass
def main():
"""
Start the Safe Eyes.
"""
system_locale = gettext.translation('safeeyes', localedir=utility.LOCALE_PATH, languages=[utility.system_locale(), 'en_US'], fallback=True)
system_locale = gettext.translation('safeeyes', localedir=utility.LOCALE_PATH,
languages=[utility.system_locale(), 'en_US'], fallback=True)
system_locale.install()
# locale.bindtextdomain is required for Glade files
# gettext.bindtextdomain(gettext.textdomain(), Utility.LOCALE_PATH)
@ -97,13 +101,15 @@ def main():
parser = argparse.ArgumentParser(prog='safeeyes', description=_('description'))
group = parser.add_mutually_exclusive_group()
group.add_argument('-a', '--about', help=_('show the about dialog'), action='store_true')
group.add_argument('-d', '--disable', help=_('disable the currently running safeeyes instance'), action='store_true')
group.add_argument('-d', '--disable', help=_('disable the currently running safeeyes instance'),
action='store_true')
group.add_argument('-e', '--enable', help=_('enable the currently running safeeyes instance'), action='store_true')
group.add_argument('-q', '--quit', help=_('quit the running safeeyes instance and exit'), action='store_true')
group.add_argument('-s', '--settings', help=_('show the settings dialog'), action='store_true')
group.add_argument('-t', '--take-break', help=_('Take a break now').lower(), action='store_true')
parser.add_argument('--debug', help=_('start safeeyes in debug mode'), action='store_true')
parser.add_argument('--status', help=_('print the status of running safeeyes instance and exit'), action='store_true')
parser.add_argument('--status', help=_('print the status of running safeeyes instance and exit'),
action='store_true')
parser.add_argument('--version', action='version', version='%(prog)s ' + SAFE_EYES_VERSION)
args = parser.parse_args()
@ -151,5 +157,5 @@ def main():
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal.SIG_DFL) # Handle Ctrl + C
signal.signal(signal.SIGINT, signal.SIG_DFL) # Handle Ctrl + C
main()

View File

@ -39,14 +39,23 @@ class BreakScheduler(BreakAPI):
self.__breaks_store = BreaksStore(context)
self.__timer = Timer(context, heartbeat, self.__start_break)
self.__plugins: PluginManager = plugin_mgr
self.__skipped: bool = False
self.__postponed: bool = False
def start(self):
def start(self, next_break_time: datetime.datetime = None):
if self.__breaks_store.is_empty():
return
self.__reset_stop_flags()
current_break = self.__breaks_store.get_break()
current_time = datetime.datetime.now()
waiting_time = current_break.waiting_time * 60
next_break_time = current_time + datetime.timedelta(seconds=waiting_time)
if current_break is None:
# This check is unnecessary
return
if next_break_time is None:
current_time = datetime.datetime.now()
waiting_time = current_break.waiting_time * 60
next_break_time = current_time + datetime.timedelta(seconds=waiting_time)
self.schedule(next_break_time)
@ -72,10 +81,15 @@ class BreakScheduler(BreakAPI):
self.start()
def skip(self):
self.__breaks_store.next()
self.start()
self.__skipped = True
self.next_break()
def schedule(self, next_break_time: datetime):
def postpone(self, duration: int) -> None:
self.__postponed = True
next_break_time = datetime.datetime.now() + datetime.timedelta(seconds=duration)
self.schedule(next_break_time)
def schedule(self, next_break_time: datetime.datetime):
if self.__breaks_store.is_empty():
return
with self.__heartbeat.lock:
@ -98,29 +112,14 @@ class BreakScheduler(BreakAPI):
self.__timer.schedule(next_break_time)
def __start_break(self):
print("Starting a break")
# BreakScheduler always call this method from a separate thread
with self.__heartbeat.lock:
self.__context.state = State.PRE_BREAK
break_obj = self.__breaks_store.get_break()
# Check if plugins want to cancel this break
if not self.__plugins.is_break_allowed(break_obj):
if self.__plugins.is_break_skipped(break_obj):
# Move to the next break
logging.info("Break '%s' is skipped by a plugin", break_obj.name)
self.__breaks_store.next()
self.start()
return
else:
postpone_time = self.__plugins.get_postpone_time(break_obj)
if postpone_time <= 0:
break_obj.reset_time()
postpone_time = break_obj.waiting_time * 60
logging.info("Break '%s' is postponed for %s seconds by a plugin", break_obj.name, postpone_time)
next_break_time = datetime.datetime.now() + datetime.timedelta(seconds=postpone_time)
self.schedule(next_break_time)
return
if not self.__is_break_allowed(break_obj):
return
# Send on_pre_break event
self.__plugins.on_pre_break(break_obj)
@ -136,12 +135,17 @@ class BreakScheduler(BreakAPI):
else:
self.__context.state = State.BREAK
self.__take_break(break_obj)
if self.__is_break_allowed(break_obj):
self.__take_break(break_obj)
def __take_break(self, break_obj: Break):
self.__plugins.on_start_break(break_obj)
self.__count_down(break_obj)
def __reset_stop_flags(self) -> None:
self.__skipped = False
self.__postponed = False
@worker
def __count_down(self, break_obj) -> None:
countdown = break_obj.duration
@ -155,8 +159,8 @@ class BreakScheduler(BreakAPI):
self.__condition.hold(1)
countdown -= 1
# TODO: Replace the hard coded boolean values
self.__plugins.on_stop_break(break_obj, False, False)
self.__plugins.on_stop_break(break_obj, self.__skipped, self.__postponed)
self.__reset_stop_flags()
with self.__heartbeat.lock:
if self.__context.state != State.BREAK:
# State changed while counting down
@ -164,3 +168,24 @@ class BreakScheduler(BreakAPI):
# Start the next break
self.next_break()
def __is_break_allowed(self, break_obj: Break) -> bool:
# Check if plugins want to cancel this break
action = self.__plugins.get_break_action(break_obj)
if action.not_allowed():
if action.skipped:
# Move to the next break
logging.info("Break '%s' is skipped by a plugin", break_obj.name)
self.__breaks_store.next()
self.start()
return True
else:
postpone_time = action.postpone_duration
if postpone_time <= 0:
break_obj.reset_time()
postpone_time = break_obj.waiting_time * 60
logging.info("Break '%s' is postponed for %s seconds by a plugin", break_obj.name, postpone_time)
next_break_time = datetime.datetime.now() + datetime.timedelta(seconds=postpone_time)
self.schedule(next_break_time)
return True
return False

View File

@ -35,7 +35,7 @@ class BreaksStore:
self.__long_queue: Queue = self.__build_longs()
# Restore the last break from session
self.__restore_last_break(context.get_session(SESSION_KEY_BREAK))
self.__restore_last_break(context.session.get(SESSION_KEY_BREAK))
def get_break(self, break_type=None) -> Optional[Break]:
if self.__current_break is None:
@ -75,7 +75,7 @@ class BreaksStore:
break_obj.reset_time()
self.__current_break = break_obj
self.__context.set_session(SESSION_KEY_BREAK, self.__current_break.name)
self.__context.session.set(SESSION_KEY_BREAK, self.__current_break.name)
return break_obj
@ -100,7 +100,7 @@ class BreaksStore:
def __next_short(self) -> Optional[Break]:
break_obj = self.__short_queue.next()
if break_obj is not None:
self.__context.set_session(SESSION_KEY_BREAK_TYPE, BreakType.SHORT)
self.__context.session.set(SESSION_KEY_BREAK_TYPE, BreakType.SHORT)
# Reduce the waiting time from the next long break
if not self.__long_queue.is_empty():
next_long_break = self.__long_queue.peek()
@ -110,7 +110,7 @@ class BreaksStore:
return break_obj
def __next_long(self) -> Optional[Break]:
self.__context.set_session(SESSION_KEY_BREAK_TYPE, BreakType.LONG)
self.__context.session.set(SESSION_KEY_BREAK_TYPE, BreakType.LONG)
return self.__long_queue.next()
def __restore_last_break(self, last_break: str) -> None:

View File

@ -15,42 +15,65 @@
# GNU General Public License for more details.
from typing import Any
from safeeyes import SAFE_EYES_VERSION
from safeeyes import SAFE_EYES_VERSION, utility
from safeeyes.config import Config
from safeeyes.spi.api import CoreAPI, BreakAPI, WindowAPI, PluginAPI
from safeeyes.env.desktop import DesktopEnvironment
from safeeyes.spi.api import CoreAPI, BreakAPI, WindowAPI, PluginAPI, ThreadAPI
from safeeyes.spi.state import State
from safeeyes.util.env import DesktopEnvironment
SESSION_KEY_BREAK = 'break'
SESSION_KEY_BREAK_TYPE = 'break_type'
class Session:
def __init__(self, read_from_disk: bool):
self.__session: dict = utility.open_session() if read_from_disk else {'plugin': {}}
def get_plugin(self, plugin_id: str) -> dict:
if plugin_id not in self.__session['plugin']:
self.__session['plugin'][plugin_id] = {}
return self.__session['plugin'][plugin_id]
def set_plugin(self, plugin_id: str, value: dict) -> None:
self.__session['plugin'][plugin_id] = value
def get(self, key: str, default: Any = None) -> Any:
return self.__session.get(key, default)
def set(self, key: str, value: Any) -> None:
self.__session[key] = value
def save(self, write_to_disk: bool) -> None:
if write_to_disk:
utility.write_json(utility.SESSION_FILE_PATH, self.__session)
else:
utility.delete(utility.SESSION_FILE_PATH)
class Context:
def __init__(self, config: Config, locale):
self.version: str = SAFE_EYES_VERSION
self.config: Config = config
self.locale = locale
self.session: dict = config.get_session()
self.session: Session = Session(config.get('persist_state', False))
self.state = State.START
self.__settings_dialog_visible = False
self.__env: DesktopEnvironment = DesktopEnvironment.get_env()
self.env: DesktopEnvironment = DesktopEnvironment.get_env()
self.core_api: CoreAPI = None
self.thread_api: ThreadAPI = None
self.break_api: BreakAPI = None
self.window_api: WindowAPI = None
self.plugin_api: PluginAPI = None
def set_apis(self, core_api: CoreAPI, window_api: WindowAPI, break_api: BreakAPI, plugin_api: PluginAPI) -> None:
def set_apis(self, core_api: CoreAPI,
thread_api: ThreadAPI,
window_api: WindowAPI,
break_api: BreakAPI,
plugin_api: PluginAPI) -> None:
self.core_api = core_api
self.thread_api = thread_api
self.break_api = break_api
self.plugin_api = plugin_api
self.window_api = window_api
def env(self) -> DesktopEnvironment:
return self.__env
def set_session(self, key: str, value: Any) -> None:
self.session[key] = value
def get_session(self, key: str) -> Any:
return self.session.get(key, None)

View File

@ -14,37 +14,43 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import importlib
import inspect
import logging
import os
import sys
from typing import List, Any, Dict
from safeeyes import utility
from safeeyes import utility, SAFE_EYES_HOME_DIR, SAFE_EYES_CONFIG_DIR
from safeeyes.config import Config
from safeeyes.context import Context
from safeeyes.env import system
from safeeyes.plugin_utils.proxy import PluginProxy
from safeeyes.utility import DESKTOP_ENVIRONMENT, CONFIG_RESOURCE
sys.path.append(os.path.abspath(utility.SYSTEM_PLUGINS_DIR))
sys.path.append(os.path.abspath(utility.USER_PLUGINS_DIR))
SYSTEM_PLUGINS_DIR = os.path.join(SAFE_EYES_HOME_DIR, 'plugins')
USER_PLUGINS_DIR = os.path.join(SAFE_EYES_CONFIG_DIR, 'plugins')
class PluginLoader:
def __init__(self):
self.__plugins: Dict[str, PluginProxy] = {}
def load(self, config: Config) -> List[PluginProxy]:
def load(self, context: Context) -> List[PluginProxy]:
# Load the plugins
for plugin in config.get('plugins'):
for plugin in context.config.get('plugins'):
try:
self.__load_plugin(plugin)
self.__load_plugin(context, plugin)
except BaseException:
logging.exception('Error in loading the plugin: %s', plugin['id'])
continue
return list(self.__plugins.values())
def __load_plugin(self, plugin: dict):
def __load_plugin(self, context: Context, plugin: dict):
"""
Load the given plugin.
"""
@ -86,8 +92,8 @@ class PluginLoader:
# The plugin is already enabled or partially loaded due to break_override_allowed
plugin_obj: PluginProxy = self.__plugins[plugin_id]
# Validate the dependencies again
if utility.check_plugin_dependencies(plugin_id, plugin_config, plugin.get('settings', {}),
plugin_path):
if PluginLoader.__check_plugin_dependencies(context, plugin_id, plugin_config,
plugin.get('settings', {}), plugin_path):
plugin_obj.disable()
del self.__plugins[plugin_id]
return
@ -100,18 +106,60 @@ class PluginLoader:
else:
# This is the first time to load the plugin
# Check for dependencies
if PluginLoader.__check_plugin_dependencies(plugin['id'], plugin_config, plugin.get('settings', {}),
plugin_path):
if PluginLoader.__check_plugin_dependencies(context, plugin['id'], plugin_config,
plugin.get('settings', {}), plugin_path):
return
# Load the plugin module
module = importlib.import_module((plugin['id'] + '.plugin'))
logging.info("Successfully loaded %s", str(module))
plugin_obj = PluginProxy(plugin['id'], module, plugin_enabled, plugin_config,
dict(plugin.get('settings', {})))
logging.info("Successfully loaded '%s' plugin from '%s'", plugin['id'], str(module.__file__))
new_settings = dict(plugin.get('settings', {}))
new_settings['path'] = os.path.join(plugin_dir, plugin_id)
plugin_obj = PluginProxy(plugin['id'], module, plugin_enabled, plugin_config, new_settings)
self.__plugins[plugin['id']] = plugin_obj
plugin_obj.enable()
@staticmethod
def load_plugins_config(context: Context, config: Config):
"""
Load all the plugins from the given directory.
"""
configs = []
for plugin in config.get('plugins'):
plugin_path = os.path.join(SYSTEM_PLUGINS_DIR, plugin['id'])
if not os.path.isdir(plugin_path):
# User plugin
plugin_path = os.path.join(USER_PLUGINS_DIR, plugin['id'])
plugin_config_path = os.path.join(plugin_path, 'config.json')
plugin_icon_path = os.path.join(plugin_path, 'icon.png')
plugin_module_path = os.path.join(plugin_path, 'plugin.py')
if not os.path.isfile(plugin_module_path):
return
icon = None
if os.path.isfile(plugin_icon_path):
icon = plugin_icon_path
else:
icon = system.get_resource_path('ic_plugin.png')
config = utility.load_json(plugin_config_path)
if config is None:
continue
dependency_description = PluginLoader.__check_plugin_dependencies(context, plugin['id'], config,
plugin.get('settings', {}), plugin_path)
if dependency_description:
plugin['enabled'] = False
config['error'] = True
config['meta']['description'] = dependency_description
icon = system.get_resource_path('ic_warning.png')
else:
config['error'] = False
config['id'] = plugin['id']
config['icon'] = icon
config['enabled'] = plugin['enabled']
for setting in config['settings']:
setting['safeeyes_config'] = plugin['settings']
configs.append(config)
return configs
@staticmethod
def __remove_if_exists(list_of_items: List, item: Any):
"""
@ -121,7 +169,8 @@ class PluginLoader:
list_of_items.remove(item)
@staticmethod
def __check_plugin_dependencies(plugin_id, plugin_config, plugin_settings, plugin_path):
def __check_plugin_dependencies(context: Context, plugin_id: str, plugin_config: dict, plugin_settings: dict,
plugin_path: str):
"""
Check the plugin dependencies.
"""
@ -133,12 +182,12 @@ class PluginLoader:
# Check the Python modules
for module in plugin_config['dependencies']['python_modules']:
if not utility.module_exist(module):
if not system.module_exists(module):
return _("Please install the Python module '%s'") % module
# Check the shell commands
for command in plugin_config['dependencies']['shell_commands']:
if not utility.command_exist(command):
if not system.command_exists(command):
return _("Please install the command-line tool '%s'") % command
# Check the resources
@ -150,7 +199,8 @@ class PluginLoader:
plugin_dependency_checker = os.path.join(plugin_path, 'dependency_checker.py')
if os.path.isfile(plugin_dependency_checker):
dependency_checker = importlib.import_module((plugin_id + '.dependency_checker'))
if dependency_checker and hasattr(dependency_checker, "validate"):
return dependency_checker.validate(plugin_config, plugin_settings)
if dependency_checker and hasattr(dependency_checker, "validate") and len(
inspect.getfullargspec(getattr(dependency_checker, "validate")).args) == 3:
return dependency_checker.validate(context, plugin_config, plugin_settings)
return None

View File

@ -20,7 +20,7 @@ from safeeyes.context import Context
from safeeyes.plugin_utils.proxy import PluginProxy
from safeeyes.spi.api import PluginAPI
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import TrayAction, Widget
from safeeyes.spi.plugin import TrayAction, Widget, BreakAction
class PluginManager(PluginAPI):
@ -33,24 +33,15 @@ class PluginManager(PluginAPI):
for plugin in self.__plugins:
plugin.init(context, {})
def is_break_allowed(self, break_obj: Break) -> bool:
def get_break_action(self, break_obj: Break) -> BreakAction:
"""
This function is called before on_pre_break and on_start_break.
"""
for plugin in self.__plugins:
if not plugin.is_break_allowed(break_obj):
return False
return True
def is_break_skipped(self, break_obj: Break) -> bool:
for plugin in self.__plugins:
if plugin.is_break_skipped(break_obj):
return True
return False
def get_postpone_time(self, break_obj: Break) -> int:
for plugin in self.__plugins:
duration = plugin.get_postpone_time(break_obj)
if duration > 0:
return duration
return -1
action = plugin.get_break_action(break_obj)
if action is not None and action.not_allowed():
return action
return BreakAction.allow()
def on_pre_break(self, break_obj: Break) -> None:
for plugin in self.__plugins:
@ -96,6 +87,7 @@ class PluginManager(PluginAPI):
for plugin in self.__plugins:
plugin.on_exit()
def update_next_break(self, break_obj: Break, next_short_break: datetime, next_long_break: datetime) -> None:
def update_next_break(self, break_obj: Break, next_short_break: datetime.datetime,
next_long_break: datetime.datetime) -> None:
for plugin in self.__plugins:
plugin.update_next_break(break_obj, next_short_break, next_long_break)

View File

@ -17,11 +17,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import abc
from datetime import datetime
from typing import Optional
from safeeyes.context import Context
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import Widget, TrayAction
from safeeyes.spi.plugin import Widget, TrayAction, BreakAction
class Plugin(abc.ABC):
@ -31,29 +32,13 @@ class Plugin(abc.ABC):
"""
pass
def is_break_allowed(self, break_obj: Break) -> bool:
def get_break_action(self, break_obj: Break) -> Optional[BreakAction]:
"""
This function is called right before the pre-break and start break calls.
Plugins must implement this function if they want to skip a break.
Called just before on_pre_break and on_start_break.
This is the opportunity for plugins to skip/postpone a break.
None means BreakAction.allow()
"""
return True
def is_break_skipped(self, break_obj: Break) -> bool:
"""
his function is called right after calling the is_break_allowed function if the output of
is_break_allowed is False. Plugins can return `True` if they want to skip the break and move to the next break.
If the output is `False`, the get_postpone_time function will be called.
"""
return False
def get_postpone_time(self, break_obj: Break) -> int:
"""
This function is called right after calling the is_break_skipped function if the output of
is_break_skipped is False. Plugins can return a positive time in millis to postpone the break.
Zero or negative value indicates that the plugin doesn't want to postpone the break which
in turns will postpone the current break by a duration equivalent to the interval.
"""
return -1
return BreakAction.allow()
def on_pre_break(self, break_obj: Break) -> None:
"""
@ -110,7 +95,7 @@ class Plugin(abc.ABC):
"""
pass
def update_next_break(self, break_obj: Break, next_short_break: int, next_long_break: int) -> None:
def update_next_break(self, break_obj: Break, next_short_break: datetime, next_long_break: datetime) -> None:
"""
Called when the next break is scheduled.
"""

View File

@ -14,12 +14,13 @@
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import inspect
from datetime import datetime
from typing import Optional, Any
from safeeyes.context import Context
from safeeyes.plugin_utils.plugin import Plugin
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import Widget, TrayAction
from safeeyes.spi.plugin import Widget, TrayAction, BreakAction
class PluginProxy(Plugin):
@ -68,35 +69,13 @@ class PluginProxy(Plugin):
if PluginProxy.__has_method(self.__plugin, 'init', 2):
self.__plugin.init(context, self.__settings)
def is_break_allowed(self, break_obj: Break) -> bool:
def get_break_action(self, break_obj: Break) -> Optional[BreakAction]:
"""
This function is called right before the pre-break and start break calls.
Plugins must implement this function if they want to skip a break.
This function is called before on_pre_break and on_start_break.
"""
if self.__is_supported(break_obj) and PluginProxy.__has_method(self.__plugin, 'is_break_allowed', 1):
return self.__plugin.is_break_allowed(break_obj)
return True
def is_break_skipped(self, break_obj: Break) -> bool:
"""
his function is called right after calling the is_break_allowed function if the output of
is_break_allowed is False. Plugins can return `True` if they want to skip the break and move to the next break.
If the output is `False`, the get_postpone_time function will be called.
"""
if self.__is_supported(break_obj) and PluginProxy.__has_method(self.__plugin, 'is_break_skipped', 1):
return self.__plugin.is_break_skipped(break_obj)
return False
def get_postpone_time(self, break_obj: Break) -> int:
"""
This function is called right after calling the is_break_skipped function if the output of
is_break_skipped is False. Plugins can return a positive time in millis to postpone the break.
Zero or negative value indicates that the plugin doesn't want to postpone the break which
in turns will postpone the current break by a duration equivalent to the interval.
"""
if self.__is_supported(break_obj) and PluginProxy.__has_method(self.__plugin, 'get_postpone_time', 1):
return self.__plugin.get_postpone_time(break_obj)
return -1
if PluginProxy.__has_method(self.__plugin, 'get_break_action', 1):
return self.__plugin.get_break_action(break_obj)
return BreakAction.allow()
def on_pre_break(self, break_obj: Break) -> None:
"""
@ -132,7 +111,7 @@ class PluginProxy(Plugin):
Return an optional break screen widget.
"""
if self.__is_supported(break_obj) and PluginProxy.__has_method(self.__plugin, 'get_widget', 1):
self.__plugin.get_widget(break_obj)
return self.__plugin.get_widget(break_obj)
return None
def get_tray_action(self, break_obj: Break) -> Optional[TrayAction]:
@ -140,7 +119,7 @@ class PluginProxy(Plugin):
Return an optional break screen widget.
"""
if self.__is_supported(break_obj) and PluginProxy.__has_method(self.__plugin, 'get_tray_action', 1):
self.__plugin.get_tray_action(break_obj)
return self.__plugin.get_tray_action(break_obj)
return None
def on_start(self) -> None:
@ -164,7 +143,7 @@ class PluginProxy(Plugin):
if self.__enabled and PluginProxy.__has_method(self.__plugin, 'on_exit', 0):
self.__plugin.on_exit()
def update_next_break(self, break_obj: Break, next_short_break: int, next_long_break: int) -> None:
def update_next_break(self, break_obj: Break, next_short_break: datetime, next_long_break: datetime) -> None:
"""
Called when the next break is scheduled.
"""

View File

@ -1,80 +1,80 @@
# #!/usr/bin/env python
# # Safe Eyes is a utility to remind you to take break frequently
# # to protect your eyes from eye strain.
#
# # Copyright (C) 2017 Gobinath
#
# # This program is free software: you can redistribute it and/or modify
# # it under the terms of the GNU General Public License as published by
# # the Free Software Foundation, either version 3 of the License, or
# # (at your option) any later version.
#
# # This program is distributed in the hope that it will be useful,
# # but WITHOUT ANY WARRANTY; without even the implied warranty of
# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# # GNU General Public License for more details.
#
# # You should have received a copy of the GNU General Public License
# # along with this program. If not, see <http://www.gnu.org/licenses/>.
# """
# Audible Alert plugin plays a sound after each breaks to notify the user that the break has end.
# """
#
# import logging
#
# from safeeyes import utility
#
# context = None
# pre_break_alert = False
# post_break_alert = False
#
#
# def play_sound(resource_name):
# """Play the audio resource.
#
# Arguments:
# resource_name {string} -- name of the wav file resource
# """
# logging.info('Playing audible alert %s', resource_name)
# try:
# # Open the sound file
# path = utility.get_resource_path(resource_name)
# if path is None:
# return
# utility.execute_command('aplay', ['-q', path])
#
# except BaseException:
# logging.error('Failed to play audible alert %s', resource_name)
#
#
# def init(ctx, plugin_config):
# """
# Initialize the plugin.
# """
# global context
# global pre_break_alert
# global post_break_alert
# logging.debug('Initialize Audible Alert plugin')
# context = ctx
# pre_break_alert = plugin_config['pre_break_alert']
# post_break_alert = plugin_config['post_break_alert']
#
#
# def on_pre_break(break_obj):
# """Play the pre_break sound if the option is enabled.
#
# Arguments:
# break_obj {safeeyes.model.Break} -- the break object
# """
# if pre_break_alert:
# play_sound('on_pre_break.wav')
#
#
# def on_stop_break():
# """
# After the break, play the alert sound
# """
# # Do not play if the break is skipped or postponed
# if context['skipped'] or context['postponed'] or not post_break_alert:
# return
# play_sound('on_stop_break.wav')
#!/usr/bin/env python
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2017 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Audible Alert plugin plays a sound after each breaks to notify the user that the break has end.
"""
import logging
from safeeyes import utility
from safeeyes.context import Context
from safeeyes.env import system
from safeeyes.spi.breaks import Break
class Player:
def __init__(self, config: dict):
self.__pre_break_enabled: bool = config.get('pre_break_alert', False)
self.__post_break_enabled: bool = config.get('post_break_alert', False)
def play_pre_break(self) -> None:
if self.__pre_break_enabled:
Player.__play('on_pre_break.wav')
def play_stop_break(self) -> None:
if self.__post_break_enabled:
Player.__play('on_stop_break.wav')
@staticmethod
def __play(resource: str) -> None:
logging.info('Audible Alert: playing audio %s', resource)
# Open the sound file
path = utility.get_resource_path(resource)
if path is not None:
system.execute(['aplay', '-q', path])
player: Player
def init(ctx: Context, plugin_config: dict) -> None:
"""
Initialize the plugin.
"""
global player
logging.info('Audible Alert: initialize the plugin')
player = Player(plugin_config)
def on_pre_break(break_obj: Break) -> None:
"""
Play the pre_break sound if the option is enabled.
"""
player.play_pre_break()
def on_stop_break(break_obj: Break, skipped: bool, postponed: bool) -> None:
"""
After the break, play the alert sound
"""
# Do not play if the break is skipped or postponed
if skipped or postponed:
return
player.play_stop_break()

View File

@ -19,12 +19,10 @@
import logging
import os
import time
from typing import List
import gi
from Xlib.display import Display
from Xlib.display import X
from safeeyes import utility
from safeeyes.context import Context
@ -212,7 +210,7 @@ class BreakScreen:
# Set visual to apply css theme. It should be called before show method.
window.set_visual(window.get_screen().get_rgba_visual())
if self.__context.env().name == 'kde':
if self.__context.env.name == 'kde':
# Fix flickering screen in KDE by setting opacity to 1
window.set_opacity(0.9)
@ -244,35 +242,35 @@ class BreakScreen:
logging.info("Lock the keyboard")
self.__keyboard_locked = True
# Grab the keyboard
root = self.__display.screen().root
root.change_attributes(event_mask=X.KeyPressMask | X.KeyReleaseMask)
root.grab_keyboard(True, X.GrabModeAsync, X.GrabModeAsync, X.CurrentTime)
# Consume keyboard events
while self.__keyboard_locked:
if self.__display.pending_events() > 0:
# Avoid waiting for next event by checking pending events
event = self.__display.next_event()
if self.__enable_shortcut and event.type == X.KeyPress:
if self.__allow_skipping and event.detail == self.__keycode_shortcut_skip:
self.skip_break()
break
elif self.__allow_postponing and event.detail == self.__keycode_shortcut_postpone:
self.postpone_break()
break
else:
# Reduce the CPU usage by sleeping for a second
time.sleep(1)
# # Grab the keyboard
# root = self.__display.screen().root
# root.change_attributes(event_mask=X.KeyPressMask | X.KeyReleaseMask)
# root.grab_keyboard(True, X.GrabModeAsync, X.GrabModeAsync, X.CurrentTime)
#
# # Consume keyboard events
# while self.__keyboard_locked:
# if self.__display.pending_events() > 0:
# # Avoid waiting for next event by checking pending events
# event = self.__display.next_event()
# if self.__enable_shortcut and event.type == X.KeyPress:
# if self.__allow_skipping and event.detail == self.__keycode_shortcut_skip:
# self.skip_break()
# break
# elif self.__allow_postponing and event.detail == self.__keycode_shortcut_postpone:
# self.postpone_break()
# break
# else:
# # Reduce the CPU usage by sleeping for a second
# time.sleep(1)
def __release_keyboard(self):
"""
Release the locked keyboard.
"""
logging.info("Unlock the keyboard")
logging.info("Break Screen: unlock the keyboard")
self.__keyboard_locked = False
self.__display.ungrab_keyboard(X.CurrentTime)
self.__display.flush()
# self.__display.ungrab_keyboard(X.CurrentTime)
# self.__display.flush()
@main
def __destroy_all_screens(self):
@ -301,6 +299,7 @@ def init(context: Context, config: dict) -> None:
"""
This function is called to initialize the plugin.
"""
logging.info('Break Screen: initialize the plugin')
global safe_eyes_context, break_config
safe_eyes_context = context
break_config = config

View File

@ -17,14 +17,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from safeeyes import utility
from safeeyes.context import Context
def validate(plugin_config, plugin_settings):
command = None
if utility.IS_WAYLAND:
command = "wlrctl"
else:
command = "xprop"
def validate(ctx: Context, plugin_config: dict, plugin_settings: dict):
command = "wlrctl" if ctx.env.is_wayland() else "xprop"
if not utility.command_exist(command):
return _("Please install the command-line tool '%s'") % command
else:

View File

@ -1,172 +1,154 @@
# #!/usr/bin/env python
# # Safe Eyes is a utility to remind you to take break frequently
# # to protect your eyes from eye strain.
#
# # Copyright (C) 2017 Gobinath
#
# # This program is free software: you can redistribute it and/or modify
# # it under the terms of the GNU General Public License as published by
# # the Free Software Foundation, either version 3 of the License, or
# # (at your option) any later version.
#
# # This program is distributed in the hope that it will be useful,
# # but WITHOUT ANY WARRANTY; without even the implied warranty of
# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# # GNU General Public License for more details.
#
# # You should have received a copy of the GNU General Public License
# # along with this program. If not, see <http://www.gnu.org/licenses/>.
# """
# Skip Fullscreen plugin skips the break if the active window is fullscreen.
# NOTE: Do not remove the unused import 'GdkX11' because it is required in Ubuntu 14.04
# """
#
# import logging
# import os
# import re
# import subprocess
#
# import gi
#
# from safeeyes.plugin_utils.spi import PluginResponse
#
# gi.require_version('Gdk', '3.0')
# from gi.repository import Gdk
# from gi.repository import GdkX11 # noqa F401
# from safeeyes import utility
#
# context = None
# skip_break_window_classes = []
# take_break_window_classes = []
# unfullscreen_allowed = True
# dnd_while_on_battery = False
#
#
# def is_active_window_skipped_wayland(pre_break):
# cmdlist = ['wlrctl', 'toplevel', 'find', 'state:fullscreen']
# try:
# process = subprocess.Popen(cmdlist, stdout=subprocess.PIPE)
# process.communicate()[0]
# if process.returncode == 0:
# return True
# elif process.returncode == 1:
# return False
# elif process.returncode == 127:
# logging.warning('Could not find wlrctl needed to detect fullscreen under wayland')
# return False
# except subprocess.CalledProcessError:
# logging.warning('Error in finding full-screen application')
# return False
#
#
# def is_active_window_skipped_xorg(pre_break):
# """
# Check for full-screen applications.
# This method must be executed by the main thread. If not, it will cause random failure.
# """
# logging.info('Searching for full-screen application')
# screen = Gdk.Screen.get_default()
#
# active_window = screen.get_active_window()
# if active_window:
# active_xid = str(active_window.get_xid())
# cmdlist = ['xprop', '-root', '-notype', '-id',
# active_xid, 'WM_CLASS', '_NET_WM_STATE']
#
# try:
# stdout = subprocess.check_output(cmdlist).decode('utf-8')
# except subprocess.CalledProcessError:
# logging.warning('Error in finding full-screen application')
# else:
# if stdout:
# is_fullscreen = 'FULLSCREEN' in stdout
# # Extract the process name
# process_names = re.findall('"(.+?)"', stdout)
# if process_names:
# process = process_names[1].lower()
# if process in skip_break_window_classes:
# return True
# elif process in take_break_window_classes:
# if is_fullscreen and unfullscreen_allowed and not pre_break:
# try:
# active_window.unfullscreen()
# except BaseException:
# logging.error(
# 'Error in unfullscreen the window ' + process)
# return False
#
# return is_fullscreen
#
# return False
#
#
# def is_on_battery():
# """
# Check if the computer is running on battery.
# """
# on_battery = False
# available_power_sources = os.listdir('/sys/class/power_supply')
# logging.info('Looking for battery status in available power sources: %s' % str(
# available_power_sources))
# for power_source in available_power_sources:
# if 'BAT' in power_source:
# # Found battery
# battery_status = os.path.join(
# '/sys/class/power_supply', power_source, 'status')
# if os.path.isfile(battery_status):
# # Additional check to confirm that the status file exists
# try:
# with open(battery_status, 'r') as status_file:
# status = status_file.read()
# if status:
# on_battery = 'discharging' in status.lower()
# except BaseException:
# logging.error('Failed to read %s' % battery_status)
# break
# return on_battery
#
#
# def init(ctx, safeeyes_config, plugin_config):
# global context
# global skip_break_window_classes
# global take_break_window_classes
# global unfullscreen_allowed
# global dnd_while_on_battery
# logging.debug('Initialize Skip Fullscreen plugin')
# context = ctx
# skip_break_window_classes = plugin_config['skip_break_windows'].split()
# take_break_window_classes = plugin_config['take_break_windows'].split()
# unfullscreen_allowed = plugin_config['unfullscreen']
# dnd_while_on_battery = plugin_config['while_on_battery']
#
#
# def on_pre_break(break_obj):
# """
# Lifecycle method executes before the pre-break period.
# """
# if utility.IS_WAYLAND:
# skip_break = is_active_window_skipped_wayland(True)
# else:
# skip_break = is_active_window_skipped_xorg(True)
# if dnd_while_on_battery and not skip_break:
# skip_break = is_on_battery()
# if skip_break:
# return PluginResponse.SKIP
# else:
# return PluginResponse.CONTINUE
#
#
# def on_start_break(break_obj) -> PluginResponse:
# """
# Lifecycle method executes just before the break.
# """
# if utility.IS_WAYLAND:
# skip_break = is_active_window_skipped_wayland(True)
# else:
# skip_break = is_active_window_skipped_xorg(True)
# if dnd_while_on_battery and not skip_break:
# skip_break = is_on_battery()
# if skip_break:
# return PluginResponse.SKIP
# else:
# return PluginResponse.CONTINUE
#!/usr/bin/env python
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2017 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Skip Fullscreen plugin skips the break if the active window is fullscreen.
NOTE: Do not remove the unused import 'GdkX11' because it is required in Ubuntu 14.04
"""
import logging
import os
import re
import subprocess
from typing import Optional
import gi
from safeeyes.context import Context
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import BreakAction
from safeeyes.spi.state import State
gi.require_version('Gdk', '3.0')
from gi.repository import Gdk
from gi.repository import GdkX11 # noqa F401
class SystemState:
def __init__(self, ctx: Context, config: dict):
self.__context = ctx
self.__skip_break_window_classes = config['skip_break_windows'].split()
self.__take_break_window_classes = config['take_break_windows'].split()
self.__unfullscreen_allowed = config['unfullscreen']
self.__dnd_while_on_battery = config['while_on_battery']
def skip_break(self) -> bool:
if self.__dnd_while_on_battery and SystemState.__is_on_battery():
logging.debug("Do Not Disturb: skipping the break as the system is on battery")
return True
return self.__is_wayland_full_screen() if self.__context.env.is_wayland() else self.__is_xorg_full_screen()
def __is_wayland_full_screen(self) -> bool:
logging.info('Do Not Disturb: searching for full-screen application in wayland')
cmdlist = ['wlrctl', 'toplevel', 'find', 'state:fullscreen']
try:
process = subprocess.Popen(cmdlist, stdout=subprocess.PIPE)
process.communicate()[0]
if process.returncode == 0:
return True
elif process.returncode == 1:
return False
elif process.returncode == 127:
logging.warning('Do Not Disturb: could not find wlrctl needed to detect fullscreen under wayland')
return False
except subprocess.CalledProcessError:
logging.warning('Do Not Disturb: error in finding full-screen application')
return False
def __is_xorg_full_screen(self) -> bool:
"""
Check for full-screen applications.
This method must be executed by the main thread. If not, it will cause random failure.
"""
logging.info('Do Not Disturb: searching for full-screen application in xorg')
screen = Gdk.Screen.get_default()
active_window = screen.get_active_window()
if active_window:
active_xid = str(active_window.get_xid())
cmdlist = ['xprop', '-root', '-notype', '-id',
active_xid, 'WM_CLASS', '_NET_WM_STATE']
try:
stdout = subprocess.check_output(cmdlist).decode('utf-8')
except subprocess.CalledProcessError:
logging.warning('Do Not Disturb: error in finding full-screen application')
else:
if stdout:
is_fullscreen = 'FULLSCREEN' in stdout
# Extract the process name
process_names = re.findall('"(.+?)"', stdout)
if process_names:
process = process_names[1].lower()
if process in self.__skip_break_window_classes:
return True
elif process in self.__take_break_window_classes:
if is_fullscreen and self.__unfullscreen_allowed and self.__context.state == State.BREAK:
# Unfullscreen a window only if the break is ready to be taken (not during pre_break state)
try:
active_window.unfullscreen()
except BaseException:
logging.error('Do Not Disturb: error in unfullscreen the window ' + process)
return False
return is_fullscreen
return False
@staticmethod
def __is_on_battery():
"""
Check if the computer is running on battery.
"""
on_battery = False
power_sources = os.listdir('/sys/class/power_supply')
logging.info('Do Not Disturb: looking for battery status in available power sources: %s' % str(power_sources))
for power_source in power_sources:
if 'BAT' in power_source:
# Found battery
battery_status = os.path.join(
'/sys/class/power_supply', power_source, 'status')
if os.path.isfile(battery_status):
# Additional check to confirm that the status file exists
try:
with open(battery_status, 'r') as status_file:
status = status_file.read()
if status:
on_battery = 'discharging' in status.lower()
except BaseException:
logging.error('Failed to read %s' % battery_status)
break
return on_battery
system_state: SystemState
def init(ctx: Context, plugin_config: dict):
logging.debug('Do Not Disturb: initialize the plugin')
global system_state
system_state = SystemState(ctx, plugin_config)
def get_break_action(self, break_obj: Break) -> Optional[BreakAction]:
"""
Called just before on_pre_break and on_start_break.
This is the opportunity for plugins to skip/postpone a break.
"""
return BreakAction.postpone() if system_state.skip_break() else BreakAction.allow()

View File

@ -16,25 +16,27 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
from safeeyes import utility
from safeeyes.context import Context
from safeeyes.env import system
def _get_next_reset_time(current_time, statistics_reset_cron):
import croniter
def __is_valid_cron(expr: str) -> bool:
from croniter import croniter
from croniter.croniter import CroniterBadCronError
try:
cron = croniter.croniter(statistics_reset_cron, current_time)
return cron.get_next(datetime.datetime)
except:
return bool(croniter.expand(expr))
except CroniterBadCronError:
# Error in getting the next reset time
return None
return False
def validate(plugin_config, plugin_settings):
if not utility.module_exist("croniter"):
return _("Please install the Python module '%s'") % "croniter"
def validate(ctx: Context, plugin_config: dict, plugin_settings: dict):
if not system.module_exists("croniter"):
return _("Please install the Python module 'croniter'")
# Validate the cron expression
statistics_reset_cron = plugin_settings.get('statistics_reset_cron', '0 0 * * *')
if _get_next_reset_time(datetime.datetime.now(), statistics_reset_cron) is None:
if __is_valid_cron(statistics_reset_cron) is None:
return _("Invalid cron expression '%s'") % statistics_reset_cron
else:
return None

View File

@ -22,197 +22,184 @@ Show health statistics on the break screen.
import datetime
import logging
from typing import Optional, Tuple
import croniter
from croniter import CroniterBadCronError
from safeeyes.context import Context
context = None
no_of_skipped_breaks = 0
no_of_breaks = 0
session = None
safe_eyes_start_time = datetime.datetime.now()
total_idle_time = 0
last_screen_time = -1
statistics_reset_cron = '0 0 * * *' # Every midnight
time_to_reset_break = datetime.datetime.now()
next_reset_time = None
enabled = True
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import Widget
def init(ctx: Context, config: dict):
class HealthStats:
def __init__(self, context: Context, config: dict):
self.__context: Context = context
self.__cron_expr: str = config.get('statistics_reset_cron', '0 0 * * *')
# Compute the next reset time
self.__safe_eyes_start_time = datetime.datetime.now()
self.__next_reset_time: Optional[datetime.datetime] = self.__get_next_reset_time()
self.enabled: bool = self.__next_reset_time is not None
self.__session: dict = context.session.get_plugin('healthstats')
if not self.enabled:
# There is an error in the cron expression
logging.error(
"Health Stats: error in parsing the cron expression `%s`. Health Stats plugin is disabled." % self.__cron_expr)
return
if not self.__session:
# Empty session
self.__session['no_of_skipped_breaks'] = 0
self.__session['no_of_breaks'] = 0
self.__session['safe_eyes_start_time'] = self.__safe_eyes_start_time.strftime("%Y-%m-%d %H:%M:%S")
self.__session['total_idle_time'] = 0
self.__session['last_screen_time'] = -1
self.__session[
'next_reset_time'] = None if self.__next_reset_time is None else self.__next_reset_time.strftime(
"%Y-%m-%d %H:%M:%S")
context.session.set_plugin('healthstats', self.__session)
else:
last_start_time: str = self.__session.get('safe_eyes_start_time')
last_reset_time = self.__session.get('next_reset_time')
if last_start_time:
self.__safe_eyes_start_time = datetime.datetime.strptime(last_start_time, "%Y-%m-%d %H:%M:%S")
if last_reset_time:
self.__next_reset_time = datetime.datetime.strptime(last_reset_time, "%Y-%m-%d %H:%M:%S")
self.reset_stats()
def reset_stats(self) -> int:
# Check if the reset time has passed
current_time = datetime.datetime.now()
total_duration_sec = (current_time - self.__safe_eyes_start_time).total_seconds()
if self.__next_reset_time is not None and current_time >= self.__next_reset_time:
logging.debug("Health Stats: resetting the statistics")
if self.__safe_eyes_start_time < self.__next_reset_time:
# Safe Eyes is running even before the reset time
# Consider the reset time as the new start time
self.__safe_eyes_start_time = self.__next_reset_time
total_duration_sec = (current_time - self.__safe_eyes_start_time).total_seconds()
self.__session['no_of_breaks'] = 0
self.__session['no_of_skipped_breaks'] = 0
self.__session['safe_eyes_start_time'] = self.__safe_eyes_start_time.strftime("%Y-%m-%d %H:%M:%S")
self.__session['total_idle_time'] = 0
self.__session['last_screen_time'] = round(
(total_duration_sec - self.__session.get('total_idle_time', 0)) / 60)
# Update the next_reset_time
self.__next_reset_time = self.__get_next_reset_time()
if self.__next_reset_time is None:
# This condition is added for a safety but not expected to run
self.enabled = False
self.__session['next_reset_time'] = None
else:
self.__session['next_reset_time'] = self.__next_reset_time.strftime("%Y-%m-%d %H:%M:%S")
return int(total_duration_sec)
def count_break(self) -> None:
self.__session['no_of_breaks'] = self.__session.get('no_of_breaks', 0) + 1
def count_skipped_break(self) -> None:
self.__session['no_of_skipped_breaks'] = self.__session.get('no_of_skipped_breaks', 0) + 1
def update_idle_time(self) -> int:
self.__session['total_idle_time'] = self.get_total_idle_time() + self.__get_idle_time()
return self.__session['total_idle_time']
def get_total_idle_time(self) -> int:
return self.__session.get('total_idle_time', 0)
def get_stats(self) -> Tuple[int, int, int, int, int]:
no_breaks = self.__session.get('no_of_breaks', 1)
skipped_breaks = self.__session.get('no_of_skipped_breaks', 0)
total_duration = self.reset_stats()
screen_time = round((total_duration - self.get_total_idle_time()) / 60)
last_screen_time = self.__session.get('last_screen_time', 0)
return no_breaks, skipped_breaks, total_duration, screen_time, last_screen_time
def __get_idle_time(self) -> int:
"""
Get the system idle time from the Smart Pause plugin.
"""
session_config = self.__context.session.get_plugin('smartpause')
return session_config.get('idle_period', 0)
def __get_next_reset_time(self) -> Optional[datetime.datetime]:
try:
cron = croniter.croniter(self.__cron_expr, datetime.datetime.now())
next_time = cron.get_next(datetime.datetime)
logging.debug("Health Stats: statistics will be reset at " + next_time.strftime("%Y-%m-%d %H:%M:%S"))
return next_time
except CroniterBadCronError:
# Error in getting the next reset time
return None
stats: HealthStats
def init(ctx: Context, plugin_config: dict) -> None:
"""
Initialize the plugin.
"""
global enabled
global context
global session
global no_of_skipped_breaks
global no_of_breaks
global statistics_reset_cron
global safe_eyes_start_time
global total_idle_time
global last_screen_time
global next_reset_time
logging.debug('Initialize Health Stats plugin')
# context = ctx
# statistics_reset_cron = plugin_config.get('statistics_reset_cron', '0 0 * * *')
# # Compute the next reset time
# next_reset_time = _get_next_reset_time(datetime.datetime.now(), statistics_reset_cron)
# enabled = next_reset_time is not None
#
# if not enabled:
# # There is an error in the cron expression
# logging.error("Error in parsing the cron expression `" + statistics_reset_cron + "`. Health Stats plugin is disabled.")
# return
#
# if session is None:
# # Read the session
# session = context['session']['plugin'].get('healthstats', None)
# if session is None:
# session = {'no_of_skipped_breaks': 0,
# 'no_of_breaks': 0,
# 'safe_eyes_start_time': safe_eyes_start_time.strftime("%Y-%m-%d %H:%M:%S"),
# 'total_idle_time': 0,
# 'last_screen_time': -1,
# 'next_reset_time': next_reset_time.strftime("%Y-%m-%d %H:%M:%S")}
# context['session']['plugin']['healthstats'] = session
# no_of_skipped_breaks = session.get('no_of_skipped_breaks', 0)
# no_of_breaks = session.get('no_of_breaks', 0)
# total_idle_time = session.get('total_idle_time', 0)
# last_screen_time = session.get('last_screen_time', -1)
# str_time = session.get('safe_eyes_start_time', None)
# str_next_reset_time = session.get('next_reset_time', None)
# if str_time:
# safe_eyes_start_time = datetime.datetime.strptime(str_time, "%Y-%m-%d %H:%M:%S")
# if str_next_reset_time:
# next_reset_time = datetime.datetime.strptime(str_time, "%Y-%m-%d %H:%M:%S")
#
# _reset_stats()
logging.debug('Health Stats: initialize the plugin')
global stats
stats = HealthStats(ctx, plugin_config)
def on_stop_break():
def on_start_break(break_obj: Break) -> None:
if stats.enabled:
stats.count_break()
def on_stop_break(break_obj: Break, skipped: bool, postponed: bool) -> None:
"""
After the break, check if it is skipped.
"""
if stats.enabled and skipped:
stats.count_skipped_break()
def get_widget(break_obj: Break) -> Optional[Widget]:
# Check if the plugin is enabled
if not enabled:
return
#
# global no_of_skipped_breaks
# if context['skipped']:
# no_of_skipped_breaks += 1
# session['no_of_skipped_breaks'] = no_of_skipped_breaks
if not stats.enabled:
return None
no_breaks, skipped_breaks, total_duration, screen_time, last_screen_time = stats.get_stats()
hours, minutes = divmod(screen_time, 60)
time_format = '{:02d}:{:02d}'.format(hours, minutes)
if hours > 6 or round((skipped_breaks / no_breaks), 1) >= 0.2:
# Unhealthy behavior -> Red broken heart
heart = '💔️'
else:
# Healthy behavior -> Green heart
heart = '💚'
if last_screen_time < 0:
screen_time_diff = ''
else:
hrs_diff, mins_diff = divmod(abs(screen_time - last_screen_time), 60)
symbol = ''
if screen_time > last_screen_time:
symbol = '+'
elif screen_time < last_screen_time:
symbol = '-'
screen_time_diff = ' ( {}{:02d}:{:02d} )'.format(symbol, hrs_diff, mins_diff)
content = "{}\tBREAKS: {}\tSKIPPED: {}\tSCREEN TIME: {}{}".format(heart, no_breaks, skipped_breaks,
time_format, screen_time_diff)
return Widget(_('Health Statistics'), content)
def get_widget_title(break_obj):
def on_start() -> None:
"""
Return the widget title.
Add the idle period to the total idle time.
"""
# Check if the plugin is enabled
if not enabled:
return ""
# global no_of_breaks
# no_of_breaks += 1
# session['no_of_breaks'] = no_of_breaks
# session['safe_eyes_start_time'] = safe_eyes_start_time.strftime("%Y-%m-%d %H:%M:%S")
# session['total_idle_time'] = total_idle_time
# session['last_screen_time'] = last_screen_time
# return _('Health Statistics')
def _reset_stats():
global no_of_breaks
global safe_eyes_start_time
global total_idle_time
global no_of_skipped_breaks
global last_screen_time
global next_reset_time
# Check if the reset time has passed
current_time = datetime.datetime.now()
# total_duration_sec = (current_time - safe_eyes_start_time).total_seconds()
# if current_time >= next_reset_time:
# logging.debug("Resetting the health statistics")
# # Reset statistics
# if safe_eyes_start_time < next_reset_time:
# # Safe Eyes is running even before the reset time
# # Consider the reset time as the new start time
# safe_eyes_start_time = next_reset_time
# total_duration_sec = (current_time - safe_eyes_start_time).total_seconds()
#
# # Update the next_reset_time
# next_reset_time = _get_next_reset_time(current_time, statistics_reset_cron)
#
# last_screen_time = round((total_duration_sec - total_idle_time) / 60)
# total_idle_time = 0
# no_of_breaks = 0
# no_of_skipped_breaks = 0
# session['no_of_breaks'] = 0
# session['no_of_skipped_breaks'] = 0
# session['safe_eyes_start_time'] = safe_eyes_start_time.strftime("%Y-%m-%d %H:%M:%S")
# session['total_idle_time'] = total_idle_time
# session['last_screen_time'] = last_screen_time
# session['next_reset_time'] = next_reset_time.strftime("%Y-%m-%d %H:%M:%S")
#
# return total_duration_sec
# def get_widget_content(break_obj):
# """
# Return the statistics.
# """
# # Check if the plugin is enabled
# if not enabled:
# return ""
#
# total_duration_sec = _reset_stats()
# screen_time = round((total_duration_sec - total_idle_time) / 60)
# hours, minutes = divmod(screen_time, 60)
# time_format = '{:02d}:{:02d}'.format(hours, minutes)
# if hours > 6 or round((no_of_skipped_breaks / no_of_breaks), 1) >= 0.2:
# # Unhealthy behavior -> Red broken heart
# heart = '💔️'
# else:
# # Healthy behavior -> Green heart
# heart = '💚'
# if last_screen_time < 0:
# screen_time_diff = ''
# else:
# hrs_diff, mins_diff = divmod(abs(screen_time - last_screen_time), 60)
# symbol = ''
# if screen_time > last_screen_time:
# symbol = '+'
# elif screen_time < last_screen_time:
# symbol = '-'
# screen_time_diff = ' ( {}{:02d}:{:02d} )'.format(symbol, hrs_diff, mins_diff)
# return "{}\tBREAKS: {}\tSKIPPED: {}\tSCREEN TIME: {}{}".format(heart, no_of_breaks, no_of_skipped_breaks,
# time_format, screen_time_diff)
#
#
# def on_start():
# """
# Add the idle period to the total idle time.
# """
# # Check if the plugin is enabled
# if not enabled:
# return ""
#
# _reset_stats()
# global total_idle_time
# # idle_period is provided by Smart Pause plugin
# total_idle_time += context.get('idle_period', 0)
# session['total_idle_time'] = total_idle_time
#
#
# def _get_next_reset_time(current_time, statistics_reset_cron):
# try:
# cron = croniter.croniter(statistics_reset_cron, current_time)
# next_time = cron.get_next(datetime.datetime)
# logging.debug("Health stats will be reset at " + next_time.strftime("%Y-%m-%d %H:%M:%S"))
# return next_time
# except:
# # Error in getting the next reset time
# return None
if stats.enabled:
stats.reset_stats()
stats.update_idle_time()

View File

@ -1,77 +1,85 @@
# #!/usr/bin/env python
# # Safe Eyes is a utility to remind you to take break frequently
# # to protect your eyes from eye strain.
#
# # Copyright (C) 2019 Gobinath
#
# # This program is free software: you can redistribute it and/or modify
# # it under the terms of the GNU General Public License as published by
# # the Free Software Foundation, either version 3 of the License, or
# # (at your option) any later version.
#
# # This program is distributed in the hope that it will be useful,
# # but WITHOUT ANY WARRANTY; without even the implied warranty of
# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# # GNU General Public License for more details.
#
# # You should have received a copy of the GNU General Public License
# # along with this program. If not, see <http://www.gnu.org/licenses/>.
# """
# Media Control plugin lets users to pause currently playing media player from the break screen.
# """
#
# import os
# import dbus
# import re
# import gi
# from safeeyes.plugin_utils.spi import TrayAction
# gi.require_version('Gtk', '3.0')
# from gi.repository import Gtk
#
# tray_icon_path = None
#
#
# def __active_players():
# """
# List of all media players which are playing now.
# """
# players = []
# bus = dbus.SessionBus()
#
# for service in bus.list_names():
# if re.match('org.mpris.MediaPlayer2.', service):
# player = bus.get_object(service, "/org/mpris/MediaPlayer2")
# interface = dbus.Interface(player, 'org.freedesktop.DBus.Properties')
# status = str(interface.Get('org.mpris.MediaPlayer2.Player', 'PlaybackStatus')).lower()
# if status == "playing":
# players.append(player)
# return players
#
#
# def __pause_players(players):
# """
# Pause all playing media players using dbus.
# """
# for player in players:
# interface = dbus.Interface(player, dbus_interface='org.mpris.MediaPlayer2.Player')
# interface.Pause()
#
#
# def init(ctx, safeeyes_config, plugin_config):
# """
# Initialize the screensaver plugin.
# """
# global tray_icon_path
# tray_icon_path = os.path.join(plugin_config['path'], "resource/pause.png")
#
#
# def get_tray_action(break_obj):
# """
# Return TrayAction only if there is a media player currently playing.
# """
# players = __active_players()
# if players:
# return TrayAction.build("Pause media",
# tray_icon_path,
# Gtk.STOCK_MEDIA_PAUSE,
# lambda: __pause_players(players))
#!/usr/bin/env python
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2019 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Media Control plugin lets users to pause currently playing media player from the break screen.
"""
import logging
import os
import re
from typing import Optional
import dbus
import gi
from safeeyes.context import Context
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import TrayAction
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
tray_icon_path = None
def __active_players():
"""
List of all media players which are playing now.
"""
players = []
bus = dbus.SessionBus()
for service in bus.list_names():
if re.match('org.mpris.MediaPlayer2.', service):
player = bus.get_object(service, "/org/mpris/MediaPlayer2")
interface = dbus.Interface(player, 'org.freedesktop.DBus.Properties')
status = str(interface.Get('org.mpris.MediaPlayer2.Player', 'PlaybackStatus')).lower()
if status == "playing":
players.append(player)
return players
def __pause_players(players):
"""
Pause all playing media players using dbus.
"""
for player in players:
interface = dbus.Interface(player, dbus_interface='org.mpris.MediaPlayer2.Player')
interface.Pause()
def init(ctx: Context, plugin_config: dict) -> None:
"""
Initialize the screensaver plugin.
"""
logging.info('Initialize Media Control plugin')
global tray_icon_path
tray_icon_path = os.path.join(plugin_config['path'], "resource/pause.png")
def get_tray_action(break_obj: Break) -> Optional[TrayAction]:
"""
Return TrayAction only if there is a media player currently playing.
"""
players = __active_players()
if players:
return TrayAction.build("Pause media",
tray_icon_path,
Gtk.STOCK_MEDIA_PAUSE,
lambda: __pause_players(players))
return None

View File

@ -1,93 +1,92 @@
# # Safe Eyes is a utility to remind you to take break frequently
# # to protect your eyes from eye strain.
#
# # Copyright (C) 2017 Gobinath
#
# # This program is free software: you can redistribute it and/or modify
# # it under the terms of the GNU General Public License as published by
# # the Free Software Foundation, either version 3 of the License, or
# # (at your option) any later version.
#
# # This program is distributed in the hope that it will be useful,
# # but WITHOUT ANY WARRANTY; without even the implied warranty of
# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# # GNU General Public License for more details.
#
# # You should have received a copy of the GNU General Public License
# # along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# import logging
#
# import gi
#
# from safeeyes.plugin_utils.spi import PluginResponse
# from safeeyes.breaks import BreakType
#
# gi.require_version('Notify', '0.7')
# from gi.repository import Notify
#
# """
# Safe Eyes Notification plugin
# """
#
# APPINDICATOR_ID = 'safeeyes'
# notification = None
# context = None
# warning_time = 10
#
# Notify.init(APPINDICATOR_ID)
#
#
# def init(ctx, safeeyes_config, plugin_config):
# """
# Initialize the plugin.
# """
# global context
# global warning_time
# logging.debug('Initialize Notification plugin')
# context = ctx
# warning_time = safeeyes_config.get('pre_break_warning_time')
#
#
# def on_pre_break(break_obj):
# """
# Show the notification
# """
# # Construct the message based on the type of the next break
# global notification
# logging.info('Show the notification')
# message = '\n'
# if break_obj.type == BreakType.SHORT:
# message += (_('Ready for a short break in %s seconds') % warning_time)
# else:
# message += (_('Ready for a long break in %s seconds') % warning_time)
#
# notification = Notify.Notification.new('Safe Eyes', message, icon='safeeyes_enabled')
# try:
# notification.show()
# except BaseException:
# logging.error('Failed to show the notification')
#
#
# def on_start_break(break_obj) -> PluginResponse:
# """
# Close the notification.
# """
# global notification
# logging.info('Close pre-break notification')
# if notification:
# try:
# notification.close()
# notification = None
# except BaseException:
# # Some operating systems automatically close the notification.
# pass
# return PluginResponse.CONTINUE
#
#
# def on_exit():
# """
# Uninitialize the registered notificaion.
# """
# logging.debug('Stop Notification plugin')
# Notify.uninit()
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2017 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import gi
from safeeyes.context import Context
from safeeyes.spi.breaks import BreakType, Break
gi.require_version('Notify', '0.7')
from gi.repository import Notify
"""
Safe Eyes Notification plugin
"""
APPINDICATOR_ID = 'safeeyes'
notification = None
context = None
warning_time = 10
Notify.init(APPINDICATOR_ID)
def init(ctx: Context, plugin_config: dict) -> None:
"""
Initialize the plugin.
"""
global context
global warning_time
logging.debug('Initialize Notification plugin')
context = ctx
warning_time = ctx.config.get('pre_break_warning_time')
def on_pre_break(break_obj: Break) -> None:
"""
Show the notification
"""
# Construct the message based on the type of the next break
global notification
logging.info('Show the notification')
message = '\n'
if break_obj.type == BreakType.SHORT:
message += (_('Ready for a short break in %s seconds') % warning_time)
else:
message += (_('Ready for a long break in %s seconds') % warning_time)
notification = Notify.Notification.new('Safe Eyes', message, icon='safeeyes_enabled')
try:
notification.show()
except BaseException:
logging.error('Failed to show the notification')
def on_start_break(break_obj: Break) -> None:
"""
Close the notification.
"""
global notification
logging.info('Close pre-break notification')
if notification:
try:
notification.close()
notification = None
except BaseException:
# Some operating systems automatically close the notification.
pass
def on_exit() -> None:
"""
Uninitialize the registered notificaion.
"""
logging.debug('Stop Notification plugin')
Notify.uninit()

View File

@ -1,142 +1,137 @@
# #!/usr/bin/env python
# # Safe Eyes is a utility to remind you to take break frequently
# # to protect your eyes from eye strain.
#
# # Copyright (C) 2017 Gobinath
#
# # This program is free software: you can redistribute it and/or modify
# # it under the terms of the GNU General Public License as published by
# # the Free Software Foundation, either version 3 of the License, or
# # (at your option) any later version.
#
# # This program is distributed in the hope that it will be useful,
# # but WITHOUT ANY WARRANTY; without even the implied warranty of
# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# # GNU General Public License for more details.
#
# # You should have received a copy of the GNU General Public License
# # along with this program. If not, see <http://www.gnu.org/licenses/>.
# """
# Screensaver plugin locks the desktop using native screensaver application, after long breaks.
# """
#
# import logging
# import os
#
# import gi
#
# from safeeyes import utility
# from safeeyes.plugin_utils.spi import TrayAction
#
# gi.require_version('Gtk', '3.0')
# from gi.repository import Gtk
#
# context = None
# lock_screen = False
# user_locked_screen = False
# lock_screen_command = None
# min_seconds = 0
# seconds_passed = 0
# tray_icon_path = None
#
#
# def __lock_screen_command():
# """
# Function tries to detect the screensaver command based on the current envinroment
# Possible results:
# Gnome, Unity, Budgie: ['gnome-screensaver-command', '--lock']
# Cinnamon: ['cinnamon-screensaver-command', '--lock']
# Pantheon, LXDE: ['light-locker-command', '--lock']
# Mate: ['mate-screensaver-command', '--lock']
# KDE: ['qdbus', 'org.freedesktop.ScreenSaver', '/ScreenSaver', 'Lock']
# XFCE: ['xflock4']
# Otherwise: None
# """
# desktop_session = os.environ.get('DESKTOP_SESSION')
# current_desktop = os.environ.get('XDG_CURRENT_DESKTOP')
# if desktop_session is not None:
# desktop_session = desktop_session.lower()
# if ('xfce' in desktop_session or desktop_session.startswith('xubuntu') or (
# current_desktop is not None and 'xfce' in current_desktop)) and utility.command_exist('xflock4'):
# return ['xflock4']
# elif desktop_session == 'cinnamon' and utility.command_exist('cinnamon-screensaver-command'):
# return ['cinnamon-screensaver-command', '--lock']
# elif (desktop_session == 'pantheon' or desktop_session.startswith('lubuntu')) and utility.command_exist(
# 'light-locker-command'):
# return ['light-locker-command', '--lock']
# elif desktop_session == 'mate' and utility.command_exist('mate-screensaver-command'):
# return ['mate-screensaver-command', '--lock']
# elif desktop_session == 'kde' or 'plasma' in desktop_session or desktop_session.startswith(
# 'kubuntu') or os.environ.get('KDE_FULL_SESSION') == 'true':
# return ['qdbus', 'org.freedesktop.ScreenSaver', '/ScreenSaver', 'Lock']
# elif desktop_session in ['gnome', 'unity', 'budgie-desktop'] or desktop_session.startswith('ubuntu'):
# if utility.command_exist('gnome-screensaver-command'):
# return ['gnome-screensaver-command', '--lock']
# # From Gnome 3.8 no gnome-screensaver-command
# return ['dbus-send', '--type=method_call', '--dest=org.gnome.ScreenSaver', '/org/gnome/ScreenSaver',
# 'org.gnome.ScreenSaver.Lock']
# elif os.environ.get('GNOME_DESKTOP_SESSION_ID'):
# if 'deprecated' not in os.environ.get('GNOME_DESKTOP_SESSION_ID') and utility.command_exist(
# 'gnome-screensaver-command'):
# # Gnome 2
# return ['gnome-screensaver-command', '--lock']
# return None
#
#
# def __lock_screen():
# global user_locked_screen
# user_locked_screen = True
#
#
# def init(ctx, safeeyes_config, plugin_config):
# """
# Initialize the screensaver plugin.
# """
# global context
# global lock_screen_command
# global min_seconds
# global tray_icon_path
# logging.debug('Initialize Screensaver plugin')
# context = ctx
# min_seconds = plugin_config['min_seconds']
# tray_icon_path = os.path.join(plugin_config['path'], "resource/lock.png")
# if plugin_config['command']:
# lock_screen_command = plugin_config['command'].split()
# else:
# lock_screen_command = __lock_screen_command()
#
#
# def on_start_break(break_obj):
# """
# Determine the break type and only if it is a long break, enable the lock_screen flag.
# """
# global lock_screen
# global seconds_passed
# global user_locked_screen
# user_locked_screen = False
# seconds_passed = 0
# if lock_screen_command:
# lock_screen = break_obj.is_long_break()
#
#
# def on_countdown(countdown, seconds):
# """
# Keep track of seconds passed from the beginning of long break.
# """
# global seconds_passed
# seconds_passed = seconds
#
#
# def on_stop_break():
# """
# Lock the screen after a long break if the user has not skipped within min_seconds.
# """
# if user_locked_screen or (lock_screen and seconds_passed >= min_seconds):
# utility.execute_command(lock_screen_command)
#
#
# def get_tray_action(break_obj):
# return TrayAction.build("Lock screen",
# tray_icon_path,
# Gtk.STOCK_DIALOG_AUTHENTICATION,
# __lock_screen)
#!/usr/bin/env python
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2017 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Screensaver plugin locks the desktop using native screensaver application, after long breaks.
"""
import logging
import os
from typing import Optional, List
import gi
from safeeyes import utility
from safeeyes.context import Context
from safeeyes.env import system
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import TrayAction
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
class Screensaver:
def __init__(self, ctx: Context, config: dict):
self.min_seconds: int = config['min_seconds']
self.__tray_icon_path: str = os.path.join(config['path'], "resource/lock.png")
self.__command: List[str] = config['command'].split() if config[
'command'] else Screensaver.__lock_screen_command(ctx.env.name)
self.__lock_required = False
def reset(self) -> None:
self.__lock_required = False
def lock_later(self) -> None:
self.__lock_required = True
def lock_if_required(self) -> None:
if self.__command is not None and self.__lock_required:
system.execute(self.__command)
def is_enabled(self) -> bool:
return self.__command is not None
@staticmethod
def __lock_screen_command(desktop: str):
"""
Function tries to detect the screensaver command based on the current environment
Possible results:
Gnome, Unity, Budgie: ['gnome-screensaver-command', '--lock']
Cinnamon: ['cinnamon-screensaver-command', '--lock']
Pantheon, LXDE: ['light-locker-command', '--lock']
Mate: ['mate-screensaver-command', '--lock']
KDE: ['qdbus', 'org.freedesktop.ScreenSaver', '/ScreenSaver', 'Lock']
XFCE: ['xflock4']
Otherwise: None
"""
if desktop is not None:
if desktop == 'xfce' and utility.command_exist('xflock4'):
return ['xflock4']
elif desktop == 'cinnamon' and utility.command_exist('cinnamon-screensaver-command'):
return ['cinnamon-screensaver-command', '--lock']
elif (desktop == 'pantheon' or desktop == 'lxde') and utility.command_exist('light-locker-command'):
return ['light-locker-command', '--lock']
elif desktop == 'mate' and utility.command_exist('mate-screensaver-command'):
return ['mate-screensaver-command', '--lock']
elif desktop == 'kde':
return ['qdbus', 'org.freedesktop.ScreenSaver', '/ScreenSaver', 'Lock']
elif desktop in ['gnome', 'unity', 'budgie-desktop']:
if utility.command_exist('gnome-screensaver-command'):
return ['gnome-screensaver-command', '--lock']
# From Gnome 3.8 no gnome-screensaver-command
return ['dbus-send', '--type=method_call', '--dest=org.gnome.ScreenSaver', '/org/gnome/ScreenSaver',
'org.gnome.ScreenSaver.Lock']
return None
screensaver: Screensaver
tray_icon_path = None
def init(ctx: Context, plugin_config: dict) -> None:
"""
Initialize the screensaver plugin.
"""
logging.debug('Initialize Screensaver plugin')
global screensaver
global tray_icon_path
screensaver = Screensaver(ctx, plugin_config)
tray_icon_path = os.path.join(plugin_config['path'], "resource/lock.png")
def on_start_break(break_obj: Break) -> None:
"""
Determine the break type and only if it is a long break, enable the lock_screen flag.
"""
screensaver.reset()
def on_count_down(break_obj: Break, countdown: int, seconds: int) -> None:
"""
Keep track of seconds passed from the beginning of long break.
"""
if break_obj.is_long_break() and seconds >= screensaver.min_seconds:
screensaver.lock_later()
def on_stop_break(break_obj: Break, skipped: bool, postponed: bool) -> None:
"""
Lock the screen after a long break if the user has not skipped within min_seconds.
"""
screensaver.lock_if_required()
def get_tray_action(break_obj: Break) -> Optional[TrayAction]:
if screensaver.is_enabled():
return TrayAction.build("Lock screen",
tray_icon_path,
Gtk.STOCK_DIALOG_AUTHENTICATION,
screensaver.lock_later)
else:
return None

View File

@ -1,237 +1,240 @@
# # Safe Eyes is a utility to remind you to take break frequently
# # to protect your eyes from eye strain.
#
# # Copyright (C) 2017 Gobinath
#
# # This program is free software: you can redistribute it and/or modify
# # it under the terms of the GNU General Public License as published by
# # the Free Software Foundation, either version 3 of the License, or
# # (at your option) any later version.
#
# # This program is distributed in the hope that it will be useful,
# # but WITHOUT ANY WARRANTY; without even the implied warranty of
# # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# # GNU General Public License for more details.
#
# # You should have received a copy of the GNU General Public License
# # along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# import datetime
# import logging
# import re
# import subprocess
# import threading
#
# from safeeyes import utility
# from safeeyes.plugin_utils.spi import PluginResponse
# from safeeyes.models.state import State
#
# """
# Safe Eyes smart pause plugin
# """
#
# context = None
# idle_condition = threading.Condition()
# lock = threading.Lock()
# active = False
# idle_time = 0
# enable_safe_eyes = None
# disable_safe_eyes = None
# smart_pause_activated = False
# idle_start_time = None
# next_break_time = None
# next_break_duration = 0
# short_break_interval = 0
# waiting_time = 2
# interpret_idle_as_break = False
# is_wayland_and_gnome = False
#
#
# def __gnome_wayland_idle_time():
# """
# Determine system idle time in seconds, specifically for gnome with wayland.
# If there's a failure, return 0.
# https://unix.stackexchange.com/a/492328/222290
# """
# try:
# output = subprocess.check_output([
# 'dbus-send',
# '--print-reply',
# '--dest=org.gnome.Mutter.IdleMonitor',
# '/org/gnome/Mutter/IdleMonitor/Core',
# 'org.gnome.Mutter.IdleMonitor.GetIdletime'
# ])
# return int(re.search(rb'\d+$', output).group(0)) / 1000
# except BaseException as e:
# logging.warning("Failed to get system idle time for gnome/wayland.")
# logging.warning(str(e))
# return 0
#
#
# def __system_idle_time():
# """
# Get system idle time in minutes.
# Return the idle time if xprintidle is available, otherwise return 0.
# """
# try:
# if is_wayland_and_gnome:
# return __gnome_wayland_idle_time()
# # Convert to seconds
# return int(subprocess.check_output(['xprintidle']).decode('utf-8')) / 1000
# except BaseException:
# return 0
#
#
# def __is_active():
# """
# Thread safe function to see if this plugin is active or not.
# """
# is_active = False
# with lock:
# is_active = active
# return is_active
#
#
# def __set_active(is_active):
# """
# Thread safe function to change the state of the plugin.
# """
# global active
# with lock:
# active = is_active
#
#
# def init(ctx, safeeyes_config, plugin_config):
# """
# Initialize the plugin.
# """
# global context
# global enable_safe_eyes
# global disable_safe_eyes
# global postpone
# global idle_time
# global short_break_interval
# global long_break_duration
# global waiting_time
# global interpret_idle_as_break
# global postpone_if_active
# global is_wayland_and_gnome
# logging.debug('Initialize Smart Pause plugin')
# context = ctx
# enable_safe_eyes = context['api']['enable_safeeyes']
# disable_safe_eyes = context['api']['disable_safeeyes']
# postpone = context['api']['postpone']
# idle_time = plugin_config['idle_time']
# interpret_idle_as_break = plugin_config['interpret_idle_as_break']
# postpone_if_active = plugin_config['postpone_if_active']
# short_break_interval = safeeyes_config.get(
# 'short_break_interval') * 60 # Convert to seconds
# long_break_duration = safeeyes_config.get('long_break_duration')
# waiting_time = min(2, idle_time) # If idle time is 1 sec, wait only 1 sec
# is_wayland_and_gnome = context['desktop'] == 'gnome' and context['is_wayland']
#
#
# def __start_idle_monitor():
# """
# Continuously check the system idle time and pause/resume Safe Eyes based on it.
# """
# global smart_pause_activated
# global idle_start_time
#
# while __is_active():
# # Wait for waiting_time seconds
# idle_condition.acquire()
# idle_condition.wait(waiting_time)
# idle_condition.release()
#
# if __is_active():
# # Get the system idle time
# system_idle_time = __system_idle_time()
# if system_idle_time >= idle_time and context['state'] == State.WAITING:
# smart_pause_activated = True
# idle_start_time = datetime.datetime.now() - datetime.timedelta(seconds=system_idle_time)
# logging.info('Pause Safe Eyes due to system idle')
# disable_safe_eyes(None)
# elif system_idle_time < idle_time and context['state'] == State.STOPPED and idle_start_time is not None:
# logging.info('Resume Safe Eyes due to user activity')
# smart_pause_activated = False
# idle_period = (datetime.datetime.now() - idle_start_time)
# idle_seconds = idle_period.total_seconds()
# context['idle_period'] = idle_seconds
# if interpret_idle_as_break and idle_seconds >= next_break_duration:
# # User is idle for break duration and wants to consider it as a break
# logging.debug("Idle for %d seconds, long break %d", idle_seconds, long_break_duration)
# enable_safe_eyes(-1, idle_seconds >= long_break_duration)
# elif idle_seconds < short_break_interval:
# # Credit back the idle time
# if next_break_time is not None:
# # This method runs in a thread since the start.
# # It may run before next_break is initialized in the update_next_break method
# next_break = next_break_time + idle_period
# enable_safe_eyes(next_break.timestamp())
# else:
# enable_safe_eyes()
# else:
# # User is idle for more than the time between two breaks
# enable_safe_eyes()
#
#
# def on_start():
# """
# Start a thread to continuously call xprintidle.
# """
# global active
# if not __is_active():
# # If SmartPause is already started, do not start it again
# logging.debug('Start Smart Pause plugin')
# __set_active(True)
# utility.start_thread(__start_idle_monitor)
#
#
# def on_stop():
# """
# Stop the thread from continuously calling xprintidle.
# """
# global active
# global smart_pause_activated
# if smart_pause_activated:
# # Safe Eyes is stopped due to system idle
# smart_pause_activated = False
# return
# logging.debug('Stop Smart Pause plugin')
# __set_active(False)
# idle_condition.acquire()
# idle_condition.notify_all()
# idle_condition.release()
#
#
# def update_next_break(break_obj, next_short_break_time, next_long_break_time):
# """
# Update the next break time.
# """
# dateTime = next_short_break_time if next_short_break_time < next_long_break_time else next_long_break_time
# global next_break_time
# global next_break_duration
# next_break_time = dateTime
# next_break_duration = break_obj.duration
#
#
# def on_start_break(break_obj) -> PluginResponse:
# """
# Lifecycle method executes just before the break.
# """
# if postpone_if_active:
# # Postpone this break if the user is active
# system_idle_time = __system_idle_time()
# if system_idle_time < 2:
# postpone(2) # Postpone for 2 seconds
# return PluginResponse.POSTPONE
# return PluginResponse.CONTINUE
#
#
# def disable():
# """
# SmartPause plugin was active earlier but now user has disabled it.
# """
# # Remove the idle_period
# context.pop('idle_period', None)
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2017 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import logging
import re
import subprocess
import threading
from typing import Optional
from safeeyes.context import Context
from safeeyes.spi.breaks import Break
from safeeyes.spi.plugin import BreakAction
from safeeyes.spi.state import State
from safeeyes.thread import ThreadCondition, worker
class SmartPause:
def __init__(self, context: Context, config: dict):
self.__context = context
self.__lock: threading.Lock = threading.Lock()
self.__condition: ThreadCondition = context.thread_api.new_condition()
self.__postpone_if_active = config['postpone_if_active']
self.__idle_time = config['idle_time']
self.__interpret_idle_as_break = config['interpret_idle_as_break']
self.__short_break_interval = context.config.get('short_break_interval') * 60 # Convert to seconds
self.__long_break_duration = context.config.get('long_break_duration')
self.__waiting_time = min(2, self.__idle_time) # If idle time is 1 sec, wait only 1 sec
self.__is_wayland_and_gnome = context.env.name == 'gnome' and context.env.is_wayland()
self.__active: bool = False
self.__smart_pause_activated: bool = False
self.__next_break_time: datetime.datetime = None
self.__next_break_duration: int = 0
self.__idle_start_time: Optional[datetime.datetime] = None
def start(self) -> None:
if not self.__is_active():
# If SmartPause is already started, do not start it again
logging.debug('Start Smart Pause plugin')
self.__set_active(True)
self.__start_idle_monitor()
def stop(self) -> None:
if self.__smart_pause_activated:
# Safe Eyes is stopped due to system idle
self.__smart_pause_activated = False
return
logging.debug('Stop Smart Pause plugin')
self.__set_active(False)
self.__condition.release_all()
def set_next_break(self, break_obj: Break, date_time: datetime.datetime) -> None:
"""
Update the next break time.
"""
self.__next_break_time = date_time
self.__next_break_duration = break_obj.duration
def should_postpone(self) -> bool:
if self.__postpone_if_active:
# Postpone this break if the user is active
system_idle_time = self.__system_idle_time()
return system_idle_time < 2
return False
def clean(self) -> None:
session_config = self.__context.session.get_plugin('smartpause')
session_config.pop('idle_period', None)
def __system_idle_time(self) -> int:
if self.__is_wayland_and_gnome:
return SmartPause.__gnome_wayland_idle_time()
else:
return SmartPause.__xorg_idle_time()
def __is_active(self):
"""
Thread safe function to see if this plugin is active or not.
"""
is_active = False
with self.__lock:
is_active = self.__active
return is_active
def __set_active(self, is_active):
"""
Thread safe function to change the state of the plugin.
"""
with self.__lock:
self.__active = is_active
@worker
def __start_idle_monitor(self):
"""
Continuously check the system idle time and pause/resume Safe Eyes based on it.
"""
while self.__is_active():
# Wait for waiting_time seconds
self.__condition.hold(self.__waiting_time)
if self.__is_active():
# Get the system idle time
system_idle_time = self.__system_idle_time()
if system_idle_time >= self.__idle_time and self.__context.state == State.WAITING:
self.__smart_pause_activated = True
self.__idle_start_time = datetime.datetime.now() - datetime.timedelta(seconds=system_idle_time)
logging.info('Pause Safe Eyes due to system idle')
self.__context.core_api.stop()
elif system_idle_time < self.__idle_time and self.__context.state == State.STOPPED and self.__idle_start_time is not None:
logging.info('Resume Safe Eyes due to user activity')
self.__smart_pause_activated = False
idle_period = (datetime.datetime.now() - self.__idle_start_time)
idle_seconds = idle_period.total_seconds()
session_config = self.__context.session.get_plugin('smartpause')
session_config['idle_period'] = idle_seconds
self.__context.session.set_plugin('smartpause', session_config)
if self.__interpret_idle_as_break and idle_seconds >= self.__next_break_duration:
# User is idle for break duration and wants to consider it as a break
logging.debug("Idle for %d seconds, long break %d", idle_seconds, self.__long_break_duration)
self.__context.core_api.start(
reset_breaks=(idle_seconds >= self.__long_break_duration))
elif idle_seconds < self.__short_break_interval:
# Credit back the idle time
if self.__next_break_time is not None:
# This method runs in a thread since the start.
# It may run before next_break is initialized in the update_next_break method
next_break = self.__next_break_time + idle_period
self.__context.core_api.start(next_break)
else:
self.__context.core_api.start()
else:
# User is idle for more than the time between two breaks
self.__context.core_api.start()
@staticmethod
def __xorg_idle_time():
"""
Get system idle time in minutes.
Return the idle time if xprintidle is available, otherwise return 0.
"""
try:
# Convert to seconds
return int(subprocess.check_output(['xprintidle']).decode('utf-8')) / 1000
except BaseException as e:
logging.warning("Failed to get system idle time for xorg.")
logging.warning(str(e))
return 0
@staticmethod
def __gnome_wayland_idle_time():
"""
Determine system idle time in seconds, specifically for gnome with wayland.
If there's a failure, return 0.
https://unix.stackexchange.com/a/492328/222290
"""
try:
output = subprocess.check_output([
'dbus-send',
'--print-reply',
'--dest=org.gnome.Mutter.IdleMonitor',
'/org/gnome/Mutter/IdleMonitor/Core',
'org.gnome.Mutter.IdleMonitor.GetIdletime'
])
return int(re.search(rb'\d+$', output).group(0)) / 1000
except BaseException as e:
logging.warning("Failed to get system idle time for gnome/wayland.")
logging.warning(str(e))
return 0
smart_pause: SmartPause
def init(context: Context, plugin_config: dict):
"""
Initialize the plugin.
"""
logging.info('Initialize Smart Pause plugin')
global smart_pause
smart_pause = SmartPause(context, plugin_config)
def on_start() -> None:
"""
Start a thread to continuously call xprintidle.
"""
smart_pause.start()
def on_stop():
"""
Stop the thread from continuously calling xprintidle.
"""
smart_pause.stop()
def update_next_break(break_obj: Break, next_short_break: datetime.datetime,
next_long_break: datetime.datetime) -> None:
"""
Update the next break time.
"""
date_time = next_short_break if next_short_break < next_long_break else next_long_break
smart_pause.set_next_break(break_obj, date_time)
def get_break_action(break_obj: Break) -> Optional[BreakAction]:
"""
Called just before on_pre_break and on_start_break.
This is the opportunity for plugins to skip/postpone a break.
None means BreakAction.allow()
"""
if smart_pause.should_postpone():
return BreakAction.postpone(2)
return BreakAction.allow()
def disable():
"""
SmartPause plugin was active earlier but now user has disabled it.
"""
# Remove the idle_period
smart_pause.clean()

View File

@ -23,7 +23,7 @@ import gi
from safeeyes.context import Context
from safeeyes.spi.breaks import BreakType
from safeeyes.thread import worker
from safeeyes.thread import worker, main
gi.require_version('Gtk', '3.0')
gi.require_version('AppIndicator3', '0.1')
@ -245,7 +245,7 @@ class TrayIcon:
self.__idle_condition.acquire()
self.__idle_condition.notify_all()
self.__idle_condition.release()
self.__context.core_api.quit_safe_eyes()
self.__context.core_api.quit()
def show_settings(self, *args):
"""
@ -276,7 +276,7 @@ class TrayIcon:
formatted_time = utility.format_time(self.__date_time)
message = _('Next break at %s') % (formatted_time)
# Update the menu item label
utility.execute_main_thread(self.item_info.set_label, message)
self.__set_label(message)
# Update the tray icon label
if self.__config.get('show_time_in_tray', False):
self.indicator.set_label(formatted_time, '')
@ -302,7 +302,7 @@ class TrayIcon:
if not self.__active:
with self.lock:
self.enable_ui()
self.__context.core_api.enable_safe_eyes()
self.__context.core_api.start()
# Notify all schedulers
self.__idle_condition.acquire()
self.__idle_condition.notify_all()
@ -320,13 +320,13 @@ class TrayIcon:
time_to_wait = args[1]
if time_to_wait <= 0:
info = _('Disabled until restart')
self.__context.core_api.disable_safe_eyes(info)
self.__context.core_api.stop(info)
self.__wakeup_time = None
self.item_info.set_label(info)
else:
self.__wakeup_time = datetime.datetime.now() + datetime.timedelta(minutes=time_to_wait)
info = _('Disabled until %s') % utility.format_time(self.__wakeup_time)
self.__context.core_api.disable_safe_eyes(info)
self.__context.core_api.stop(info)
self.item_info.set_label(info)
self.__schedule_resume(time_to_wait)
@ -383,15 +383,15 @@ class TrayIcon:
with self.lock:
if not self.__active:
utility.execute_main_thread(self.item_enable.activate)
self.__activate_enable_menu()
@worker
def start_animation(self):
if not self.__active or not self.__animate:
return
utility.execute_main_thread(lambda: self.indicator.set_icon("safeeyes_disabled"))
self.__set_icon("safeeyes_disabled")
time.sleep(0.5)
utility.execute_main_thread(lambda: self.indicator.set_icon("safeeyes_enabled"))
self.__set_icon("safeeyes_enabled")
if self.__animate and self.__active:
time.sleep(0.5)
if self.__animate and self.__active:
@ -400,9 +400,21 @@ class TrayIcon:
def stop_animation(self):
self.__animate = False
if self.__active:
utility.execute_main_thread(lambda: self.indicator.set_icon("safeeyes_enabled"))
self.__set_icon("safeeyes_enabled")
else:
utility.execute_main_thread(lambda: self.indicator.set_icon("safeeyes_disabled"))
self.__set_icon("safeeyes_disabled")
@main
def __set_icon(self, icon_name: str) -> None:
self.indicator.set_icon(icon_name)
@main
def __activate_enable_menu(self) -> None:
self.item_enable.activate()
@main
def __set_label(self, message: str) -> None:
self.item_info.set_label(message)
context: Context = None

View File

@ -21,6 +21,7 @@ SafeEyes connects all the individual components and provide the complete applica
"""
import atexit
import datetime
import logging
import sys
@ -28,7 +29,6 @@ import dbus
import gi
from dbus.mainloop.glib import DBusGMainLoop
from safeeyes import utility
from safeeyes.breaks.scheduler import BreakScheduler
from safeeyes.config import Config
from safeeyes.context import Context
@ -53,17 +53,20 @@ class SafeEyes(CoreAPI):
self.__context: Context = Context(config, locale.init_locale())
self.__plugin_loader = PluginLoader()
self.__heartbeat = Heartbeat(self.__context)
self.__plugin_manager: PluginManager = PluginManager(self.__plugin_loader.load(config))
self.__plugin_manager: PluginManager = PluginManager(self.__plugin_loader.load(self.__context))
self.__scheduler: BreakScheduler = BreakScheduler(self.__context, self.__heartbeat, self.__plugin_manager)
self.__ui_manager: UIManager = UIManager(self.__context, self.__on_config_changed)
self.__active = False
self.__context.set_apis(self, self.__ui_manager, self.__scheduler, self.__plugin_manager)
self.__context.set_apis(self, self.__heartbeat, self.__ui_manager, self.__scheduler, self.__plugin_manager)
self.__plugin_manager.init(self.__context)
# Save the session on exit
atexit.register(self.__persist_session)
def start(self):
def start(self, scheduled_next_break_time: datetime.datetime = None, reset_breaks=False):
"""
Listen to tray icon enable action and send the signal to core.
"""
"""
Start Safe Eyes
"""
@ -72,10 +75,14 @@ class SafeEyes(CoreAPI):
self.__active = True
self.__context.state = State.START
self.__plugin_manager.on_start() # Call the start method of all plugins
self.__scheduler.start()
# todo: reset breaks
self.__scheduler.start(scheduled_next_break_time)
self.__handle_system_suspend()
def stop(self):
"""
Listen to tray icon disable action and send the signal to core.
"""
"""
Stop Safe Eyes
"""
@ -90,7 +97,7 @@ class SafeEyes(CoreAPI):
self.__persist_session()
@main
def quit_safe_eyes(self):
def quit(self):
self.stop()
logging.info("Quit safe eyes")
self.__context.state = State.QUIT
@ -99,26 +106,11 @@ class SafeEyes(CoreAPI):
# os._exit(0)
sys.exit(0)
def enable_safe_eyes(self, scheduled_next_break_time=-1, reset_breaks=False):
"""
Listen to tray icon enable action and send the signal to core.
"""
self.start()
def disable_safe_eyes(self):
"""
Listen to tray icon disable action and send the signal to core.
"""
self.stop()
def __persist_session(self):
"""
Save the session object to the session file.
"""
if self.__context.config.get('persist_state'):
utility.write_json(utility.SESSION_FILE_PATH, self.__context.session)
else:
utility.delete(utility.SESSION_FILE_PATH)
self.__context.session.save(self.__context.config.get('persist_state', False))
def __start_rpc_server(self):
# if self.rpc_server is None:
@ -144,10 +136,10 @@ class SafeEyes(CoreAPI):
# Restart the core and initialize the components
self.__context.config = config
self.__plugin_manager = PluginManager(self.__plugin_loader.load(config))
self.__plugin_manager = PluginManager(self.__plugin_loader.load(self.__context))
self.__scheduler = BreakScheduler(self.__context, self.__heartbeat, self.__plugin_manager)
self.__plugin_manager.init(self.__context)
self.__context.set_apis(self, self.__ui_manager, self.__scheduler, self.__plugin_manager)
self.__context.set_apis(self, self.__heartbeat, self.__ui_manager, self.__scheduler, self.__plugin_manager)
if is_active:
self.start()

View File

@ -36,13 +36,13 @@ class CoreAPI(abc.ABC):
def get_status(self) -> str:
return self.__status
def enable_safe_eyes(self, next_break_time: int = -1, reset_breaks: bool = False):
def start(self, next_break_time: datetime = None, reset_breaks: bool = False):
pass
def disable_safe_eyes(self):
def stop(self):
pass
def quit_safe_eyes(self):
def quit(self):
pass
@ -58,8 +58,7 @@ class BreakAPI(abc.ABC):
pass
def postpone(self, duration: int) -> None:
next_break_time = datetime.datetime.now() + datetime.timedelta(seconds=duration)
self.schedule(next_break_time)
pass
def schedule(self, next_break_time: datetime) -> None:
pass
@ -87,3 +86,26 @@ class PluginAPI(abc.ABC):
def get_tray_actions(self, break_obj: Break) -> List[TrayAction]:
pass
class Condition(abc.ABC):
def hold(self, timeout: int) -> None:
pass
def release_all(self) -> None:
pass
class ThreadAPI(abc.ABC):
def release_all(self) -> None:
pass
def restart(self) -> None:
pass
def hold(self, timeout: int) -> None:
pass
def new_condition(self) -> Condition:
pass

View File

@ -71,10 +71,33 @@ class Widget:
self.content: str = content
def is_empty(self) -> bool:
return self.title is not None and self.content is not None and len(self.title) > 0 and len(self.content) > 0
return self.title is None or self.content is None
def format(self) -> str:
if self.is_empty():
return ''
else:
return '<b>{}</b>\n{}\n{}\n\n\n'.format(self.title, WIDGET_HORIZONTAL_LINE, self.content)
class BreakAction:
def __init__(self, skipped: bool, postponed: bool, postpone_duration: int):
self.skipped = skipped
self.postponed = postponed
self.postpone_duration = postpone_duration
def not_allowed(self) -> bool:
return self.skipped or self.postponed
@staticmethod
def allow():
return BreakAction(False, False, -1)
@staticmethod
def skip():
return BreakAction(True, False, -1)
@staticmethod
def postpone(duration: int = -1):
return BreakAction(False, True, duration)

View File

@ -20,6 +20,7 @@ import time
from typing import List, Optional, Callable
from safeeyes.context import Context
from safeeyes.spi.api import ThreadAPI, Condition
from safeeyes.spi.state import State
@ -41,7 +42,7 @@ def worker(fun):
return run
class ThreadCondition:
class ThreadCondition(Condition):
def __init__(self):
self.__waiting_condition: threading.Condition = threading.Condition()
@ -60,7 +61,7 @@ class ThreadCondition:
self.__waiting_condition.release()
class Heartbeat:
class Heartbeat(ThreadAPI):
def __init__(self, context: Context):
self.__waiting_condition: threading.Condition = threading.Condition()
self.__context: Context = context
@ -68,13 +69,13 @@ class Heartbeat:
self.lock = threading.Lock()
self.__conditions: List[ThreadCondition] = []
def start(self):
def start(self) -> None:
"""
Stop the heartbeat.
"""
self.running = True
def stop(self):
def stop(self) -> None:
"""
Stop the heartbeat.
"""
@ -85,7 +86,7 @@ class Heartbeat:
self.__waiting_condition.notify_all()
self.__waiting_condition.release()
def release_all(self):
def release_all(self) -> None:
"""
Release all waiting threads so that they will continue executing their remaining tasks.
"""
@ -96,7 +97,7 @@ class Heartbeat:
self.__waiting_condition.notify_all()
self.__waiting_condition.release()
def restart(self):
def restart(self) -> None:
self.__waiting_condition.acquire()
self.running = False
self.__waiting_condition.notify_all()
@ -104,7 +105,7 @@ class Heartbeat:
time.sleep(1) # Wait for 1 sec to ensure the scheduler is dead
self.running = True
def hold(self, timeout: int):
def hold(self, timeout: int) -> None:
self.__waiting_condition.acquire()
self.__waiting_condition.wait(timeout)
self.__waiting_condition.release()
@ -120,13 +121,13 @@ class Timer:
def __init__(self, context: Context, heartbeat: Heartbeat, on_timeout: Callable[[], None]):
self.__context = context
import datetime
self.__next_schedule: Optional[datetime] = None
self.__next_schedule: Optional[datetime.datetime] = None
self.__heartbeat: Heartbeat = heartbeat
self.__on_timeout: Callable[[], None] = on_timeout
self.__condition: ThreadCondition = heartbeat.new_condition()
self.__running = False
def schedule(self, next_break_at: datetime):
def schedule(self, next_break_at: datetime.datetime):
self.__next_schedule = next_break_at
with self.__heartbeat.lock:

View File

@ -39,7 +39,7 @@ class UIManager(WindowAPI):
if not self.__settings_dialog_visible:
logging.info("Show settings dialog")
self.__settings_dialog_visible = True
settings_dialog = SettingsDialog(Config.from_json(), self.__save_settings)
settings_dialog = SettingsDialog(self.__context, Config.from_json(), self.__save_settings)
settings_dialog.show()
@main
@ -51,7 +51,7 @@ class UIManager(WindowAPI):
about_dialog = AboutDialog(self.__context.version)
about_dialog.show()
def __save_settings(self, config):
def __save_settings(self, config: Config):
"""
Listen to Settings dialog Save action and write to the config file.
"""

View File

@ -18,16 +18,19 @@
import math
import os
from typing import Callable
import gi
from safeeyes import utility
from safeeyes.config import Config
from safeeyes.context import Context
from safeeyes.plugin_utils.loader import PluginLoader
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GdkPixbuf
SETTINGS_DIALOG_GLADE = os.path.join(utility.BIN_DIRECTORY, "glade/settings_dialog.glade")
SETTINGS_DIALOG_PLUGIN_GLADE = os.path.join(utility.BIN_DIRECTORY, "glade/settings_plugin.glade")
SETTINGS_DIALOG_BREAK_GLADE = os.path.join(utility.BIN_DIRECTORY, "glade/settings_break.glade")
@ -44,9 +47,10 @@ class SettingsDialog:
Create and initialize SettingsDialog instance.
"""
def __init__(self, config, on_save_settings):
self.config = config
self.on_save_settings = on_save_settings
def __init__(self, context: Context, config: Config, on_save_settings: Callable[[Config], None]):
self.__context: Context = context
self.__config: Config = config
self.__on_save_settings: Callable[[Config], None] = on_save_settings
self.plugin_switches = {}
self.plugin_map = {}
self.last_short_break_interval = config.get('short_break_interval')
@ -95,9 +99,9 @@ class SettingsDialog:
for long_break in config.get('long_breaks'):
self.__create_break_item(long_break, False)
for plugin_config in utility.load_plugins_config(config):
for plugin_config in PluginLoader.load_plugins_config(self.__context, config):
self.box_plugins.pack_start(self.__create_plugin_item(plugin_config), False, False, 0)
self.spin_short_break_duration.set_value(config.get('short_break_duration'))
self.spin_long_break_duration.set_value(config.get('long_break_duration'))
self.spin_short_break_interval.set_value(config.get('short_break_interval'))
@ -125,7 +129,7 @@ class SettingsDialog:
lambda button: self.__show_break_properties_dialog(
break_config,
is_short,
self.config,
self.__config,
lambda cfg: lbl_name.set_label(_(cfg['name'])),
lambda is_short, break_config: self.__create_break_item(break_config, is_short),
lambda: parent_box.remove(box)
@ -146,17 +150,18 @@ class SettingsDialog:
def on_reset_menu_clicked(self, button):
self.popover.hide()
def __confirmation_dialog_response(widget, response_id):
if response_id == Gtk.ResponseType.OK:
utility.reset_config()
self.config = Config()
self.__config = Config()
# Remove breaks from the container
self.box_short_breaks.foreach(lambda element: self.box_short_breaks.remove(element))
self.box_long_breaks.foreach(lambda element: self.box_long_breaks.remove(element))
# Remove plugins from the container
self.box_plugins.foreach(lambda element: self.box_plugins.remove(element))
# Initialize again
self.__initialize(self.config)
self.__initialize(self.__config)
widget.destroy()
messagedialog = Gtk.MessageDialog(parent=self.window,
@ -177,9 +182,9 @@ class SettingsDialog:
def __confirmation_dialog_response(widget, response_id):
if response_id == Gtk.ResponseType.OK:
if is_short:
self.config.get('short_breaks').remove(break_config)
self.__config.get('short_breaks').remove(break_config)
else:
self.config.get('long_breaks').remove(break_config)
self.__config.get('long_breaks').remove(break_config)
on_remove()
widget.destroy()
@ -253,7 +258,8 @@ class SettingsDialog:
long_break_interval = self.spin_long_break_interval.get_value_as_int()
self.spin_long_break_interval.set_range(short_break_interval * 2, 120)
self.spin_long_break_interval.set_increments(short_break_interval, short_break_interval * 2)
self.spin_long_break_interval.set_value(short_break_interval * math.ceil(long_break_interval / self.last_short_break_interval))
self.spin_long_break_interval.set_value(
short_break_interval * math.ceil(long_break_interval / self.last_short_break_interval))
self.last_short_break_interval = short_break_interval
if not self.initializing and not self.infobar_long_break_shown:
self.infobar_long_break_shown = True
@ -294,26 +300,27 @@ class SettingsDialog:
"""
Event handler for add break button.
"""
dialog = NewBreakDialog(self.config, lambda is_short, break_config: self.__create_break_item(break_config, is_short))
dialog = NewBreakDialog(self.__config,
lambda is_short, break_config: self.__create_break_item(break_config, is_short))
dialog.show()
def on_window_delete(self, *args):
"""
Event handler for Settings dialog close action.
"""
self.config.set('short_break_duration', self.spin_short_break_duration.get_value_as_int())
self.config.set('long_break_duration', self.spin_long_break_duration.get_value_as_int())
self.config.set('short_break_interval', self.spin_short_break_interval.get_value_as_int())
self.config.set('long_break_interval', self.spin_long_break_interval.get_value_as_int())
self.config.set('pre_break_warning_time', self.spin_time_to_prepare.get_value_as_int())
self.config.set('random_order', self.switch_random_order.get_active())
self.config.set('persist_state', self.switch_persist.get_active())
self.config.set('use_rpc_server', self.switch_rpc_server.get_active())
for plugin in self.config.get('plugins'):
self.__config.set('short_break_duration', self.spin_short_break_duration.get_value_as_int())
self.__config.set('long_break_duration', self.spin_long_break_duration.get_value_as_int())
self.__config.set('short_break_interval', self.spin_short_break_interval.get_value_as_int())
self.__config.set('long_break_interval', self.spin_long_break_interval.get_value_as_int())
self.__config.set('pre_break_warning_time', self.spin_time_to_prepare.get_value_as_int())
self.__config.set('random_order', self.switch_random_order.get_active())
self.__config.set('persist_state', self.switch_persist.get_active())
self.__config.set('use_rpc_server', self.switch_rpc_server.get_active())
for plugin in self.__config.get('plugins'):
if plugin['id'] in self.plugin_switches:
plugin['enabled'] = self.plugin_switches[plugin['id']].get_active()
self.on_save_settings(self.config) # Call the provided save method
self.__on_save_settings(self.__config) # Call the provided save method
self.window.destroy()
@ -333,11 +340,15 @@ class PluginSettingsDialog:
self.window.set_title(_('Plugin Settings'))
for setting in config.get('settings'):
if setting['type'].upper() == 'INT':
box_settings.pack_start(self.__load_int_item(setting['label'], setting['id'], setting['safeeyes_config'], setting.get('min', 0), setting.get('max', 120)), False, False, 0)
box_settings.pack_start(
self.__load_int_item(setting['label'], setting['id'], setting['safeeyes_config'],
setting.get('min', 0), setting.get('max', 120)), False, False, 0)
elif setting['type'].upper() == 'TEXT':
box_settings.pack_start(self.__load_text_item(setting['label'], setting['id'], setting['safeeyes_config']), False, False, 0)
box_settings.pack_start(
self.__load_text_item(setting['label'], setting['id'], setting['safeeyes_config']), False, False, 0)
elif setting['type'].upper() == 'BOOL':
box_settings.pack_start(self.__load_bool_item(setting['label'], setting['id'], setting['safeeyes_config']), False, False, 0)
box_settings.pack_start(
self.__load_bool_item(setting['label'], setting['id'], setting['safeeyes_config']), False, False, 0)
def __load_int_item(self, name, key, settings, min_value, max_value):
"""
@ -471,9 +482,12 @@ class BreakSettingsDialog:
self.switch_override_interval.connect('state-set', self.on_switch_override_interval_activate)
self.switch_override_duration.connect('state-set', self.on_switch_override_duration_activate)
self.switch_override_plugins.connect('state-set', self.on_switch_override_plugins_activate)
self.on_switch_override_interval_activate(self.switch_override_interval, self.switch_override_interval.get_active())
self.on_switch_override_duration_activate(self.switch_override_duration, self.switch_override_duration.get_active())
self.on_switch_override_plugins_activate(self.switch_override_plugins, self.switch_override_plugins.get_active())
self.on_switch_override_interval_activate(self.switch_override_interval,
self.switch_override_interval.get_active())
self.on_switch_override_duration_activate(self.switch_override_duration,
self.switch_override_duration.get_active())
self.on_switch_override_plugins_activate(self.switch_override_plugins,
self.switch_override_plugins.get_active())
def on_switch_override_interval_activate(self, switch_button, state):
"""
@ -498,7 +512,8 @@ class BreakSettingsDialog:
"""
Show a file chooser dialog and let the user to select an image.
"""
dialog = Gtk.FileChooserDialog(_('Please select an image'), self.window, Gtk.FileChooserAction.OPEN, (Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_OPEN, Gtk.ResponseType.OK))
dialog = Gtk.FileChooserDialog(_('Please select an image'), self.window, Gtk.FileChooserAction.OPEN,
(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL, Gtk.STOCK_OPEN, Gtk.ResponseType.OK))
png_filter = Gtk.FileFilter()
png_filter.set_name("PNG files")

View File

@ -1,71 +0,0 @@
#!/usr/bin/env python
# Safe Eyes is a utility to remind you to take break frequently
# to protect your eyes from eye strain.
# Copyright (C) 2021 Gobinath
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import logging
import os
import re
import subprocess
class DesktopEnvironment:
def __init__(self, name: str, display_server: str):
self.name: str = name
self.display_server: str = display_server
@staticmethod
def get_env():
return DesktopEnvironment(DesktopEnvironment.__get_desktop_env(),
"wayland" if DesktopEnvironment.__is_wayland() else "xorg")
@staticmethod
def __get_desktop_env() -> str:
"""
Detect the desktop environment.
"""
desktop_session = os.environ.get('DESKTOP_SESSION')
current_desktop = os.environ.get('XDG_CURRENT_DESKTOP')
env = 'unknown'
if desktop_session is not None:
desktop_session = desktop_session.lower()
if desktop_session in ['gnome', 'unity', 'budgie-desktop', 'cinnamon', 'mate', 'xfce4', 'lxde', 'pantheon',
'fluxbox', 'blackbox', 'openbox', 'icewm', 'jwm', 'afterstep', 'trinity', 'kde']:
env = desktop_session
elif desktop_session.startswith('xubuntu') or (current_desktop is not None and 'xfce' in current_desktop):
env = 'xfce'
elif desktop_session.startswith('lubuntu'):
env = 'lxde'
elif 'plasma' in desktop_session or desktop_session.startswith('kubuntu') or os.environ.get(
'KDE_FULL_SESSION') == 'true':
env = 'kde'
elif os.environ.get('GNOME_DESKTOP_SESSION_ID'):
env = 'gnome'
elif desktop_session.startswith('ubuntu'):
env = 'unity'
return env
@staticmethod
def __is_wayland() -> bool:
"""
Determine if Wayland is running
https://unix.stackexchange.com/a/325972/222290
"""
try:
session_id = subprocess.check_output(['loginctl']).split(b'\n')[1].split()[0]
output = subprocess.check_output(['loginctl', 'show-session', session_id, '-p', 'Type'])
except BaseException:
logging.warning('Unable to determine if wayland is running. Assuming no.')
return False
else:
return bool(re.search(b'wayland', output, re.IGNORECASE))

View File

@ -21,15 +21,12 @@ This module contains utility functions for Safe Eyes and its plugins.
"""
import errno
import imp
import importlib
import inspect
import json
import locale
import logging
import os
import shutil
import subprocess
import sys
from distutils.version import LooseVersion
from logging.handlers import RotatingFileHandler
@ -41,7 +38,6 @@ import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
from gi.repository import GdkPixbuf
gi.require_version('Gdk', '3.0')
@ -85,20 +81,6 @@ def get_resource_path(resource_name):
return resource_location
def execute_main_thread(target_function, arg1=None, arg2=None, arg3=None):
"""
Execute the given function in main thread.
"""
if arg1 is not None and arg2 is not None and arg3 is not None:
GLib.idle_add(lambda: target_function(arg1, arg2, arg3))
elif arg1 is not None and arg2 is not None:
GLib.idle_add(lambda: target_function(arg1, arg2))
elif arg1 is not None:
GLib.idle_add(lambda: target_function(arg1))
else:
GLib.idle_add(target_function)
def system_locale(category=locale.LC_MESSAGES):
"""
Return the system locale. If not available, return en_US.UTF-8.
@ -176,100 +158,6 @@ def delete(file_path):
pass
def check_plugin_dependencies(plugin_id, plugin_config, plugin_settings, plugin_path):
"""
Check the plugin dependencies.
"""
# Check the desktop environment
if plugin_config['dependencies']['desktop_environments']:
# Plugin has restrictions on desktop environments
if DESKTOP_ENVIRONMENT not in plugin_config['dependencies']['desktop_environments']:
return _('Plugin does not support %s desktop environment') % DESKTOP_ENVIRONMENT
# Check the Python modules
for module in plugin_config['dependencies']['python_modules']:
if not module_exist(module):
return _("Please install the Python module '%s'") % module
# Check the shell commands
for command in plugin_config['dependencies']['shell_commands']:
if not command_exist(command):
return _("Please install the command-line tool '%s'") % command
# Check the resources
for resource in plugin_config['dependencies']['resources']:
if get_resource_path(resource) is None:
return _('Please add the resource %(resource)s to %(config_resource)s directory') % {'resource': resource,
'config_resource': CONFIG_RESOURCE}
plugin_dependency_checker = os.path.join(plugin_path, 'dependency_checker.py')
if os.path.isfile(plugin_dependency_checker):
dependency_checker = importlib.import_module((plugin_id + '.dependency_checker'))
if dependency_checker and hasattr(dependency_checker, "validate"):
return dependency_checker.validate(plugin_config, plugin_settings)
return None
def load_plugins_config(safeeyes_config):
"""
Load all the plugins from the given directory.
"""
configs = []
for plugin in safeeyes_config.get('plugins'):
plugin_path = os.path.join(SYSTEM_PLUGINS_DIR, plugin['id'])
if not os.path.isdir(plugin_path):
# User plugin
plugin_path = os.path.join(USER_PLUGINS_DIR, plugin['id'])
plugin_config_path = os.path.join(plugin_path, 'config.json')
plugin_icon_path = os.path.join(plugin_path, 'icon.png')
plugin_module_path = os.path.join(plugin_path, 'plugin.py')
if not os.path.isfile(plugin_module_path):
return
icon = None
if os.path.isfile(plugin_icon_path):
icon = plugin_icon_path
else:
icon = get_resource_path('ic_plugin.png')
config = load_json(plugin_config_path)
if config is None:
continue
dependency_description = check_plugin_dependencies(plugin['id'], config, plugin.get('settings', {}),
plugin_path)
if dependency_description:
plugin['enabled'] = False
config['error'] = True
config['meta']['description'] = dependency_description
icon = get_resource_path('ic_warning.png')
else:
config['error'] = False
config['id'] = plugin['id']
config['icon'] = icon
config['enabled'] = plugin['enabled']
for setting in config['settings']:
setting['safeeyes_config'] = plugin['settings']
configs.append(config)
return configs
def execute_command(command, args=[]):
"""
Execute the shell command without waiting for its response.
"""
if command:
command_to_execute = []
if isinstance(command, str):
command_to_execute.append(command)
else:
command_to_execute.extend(command)
if args:
command_to_execute.extend(args)
try:
subprocess.Popen(command_to_execute)
except BaseException:
logging.error('Error in executing the command ' + str(command))
def command_exist(command):
"""
Check whether the given command exist in the system or not.
@ -279,17 +167,6 @@ def command_exist(command):
return False
def module_exist(module):
"""
Check wther the given Python module exists or not.
"""
try:
imp.find_module(module)
return True
except ImportError:
return False
def merge_configs(new_config, old_config):
"""
Merge the values of old_config into the new_config.