test SafeEyesCore: add handling for condvar, add explanation, check exact times

This commit is contained in:
deltragon 2024-08-25 22:33:32 +02:00
parent d43e8dc8bb
commit a823848a91
1 changed files with 116 additions and 97 deletions

View File

@ -23,14 +23,58 @@ class TestSafeEyesCore:
@pytest.fixture
def sequential_threading(self, monkeypatch, time_machine):
# executes instantly
# TODO: separate thread?
"""This fixture allows stopping threads at any point.
It is hard-coded for SafeEyesCore, the handle class returned by the fixture must be initialized
with a SafeEyesCore instance to be patched.
With this, all sleeping/blocking/thread starting calls inside SafeEyesCore are intercepted, and paused.
Additionally, all threads inside SafeEyesCore run sequentially.
The test code can use the next() method to unpause the thread,
which will run until the next sleeping/blocking/thread starting call,
after which it needs to be woken up using next() again.
The next() method blocks the test code while the thread is running.
"""
# executes instantly, on the same thread
# no need to switch threads, as we don't use any gtk things
monkeypatch.setattr(
core.utility,
"execute_main_thread",
lambda target_function, *args, **kwargs: target_function(*args, **kwargs)
)
handle = None
def utility_start_thread(target_function, **kwargs):
if not handle:
raise Exception("handle must be initialized before first thread")
handle.utility_start_thread(target_function, **kwargs)
def time_sleep(time):
if not handle:
raise Exception("handle must be initialized before first sleep call")
handle.sleep(time)
monkeypatch.setattr(
core.utility,
"start_thread",
utility_start_thread
)
monkeypatch.setattr(
core.time,
"sleep",
time_sleep
)
class PatchedCondition(threading.Condition):
def __init__(self, handle):
super().__init__()
self.handle = handle
def wait(self, timeout):
self.handle.wait_condvar(timeout)
class Handle:
thread = None
@ -39,8 +83,16 @@ class TestSafeEyesCore:
condvar_in = threading.Condition()
condvar_out = threading.Condition()
def __init__(self, time_machine):
def __init__(self, safe_eyes_core):
nonlocal handle
nonlocal time_machine
if handle:
raise Exception("only one handle is allowed per test call")
handle = self
self.time_machine = time_machine
self.safe_eyes_core = safe_eyes_core
self.safe_eyes_core.waiting_condition = PatchedCondition(self)
def background_thread(self):
while True:
@ -50,11 +102,8 @@ class TestSafeEyesCore:
raise Exception("thread timed out")
if not self.running:
logging.debug(f"background task shutdown")
break
logging.debug(f"background task woken up")
if self.task_queue:
(target_function, kwargs) = self.task_queue.popleft()
logging.debug(f"thread started {target_function}")
@ -74,11 +123,23 @@ class TestSafeEyesCore:
if not success:
raise Exception("thread timed out")
def wait_condvar(self, time):
if self.thread is not threading.current_thread():
raise Exception("waiting on condition may only happen in thread")
with self.condvar_out:
self.condvar_out.notify()
self.time_machine.shift(delta=datetime.timedelta(seconds=time))
with self.condvar_in:
success = self.condvar_in.wait(1)
if not success:
raise Exception("thread timed out")
def utility_start_thread(self, target_function, **kwargs):
self.task_queue.append((target_function, kwargs))
if self.thread is None:
self.thread = threading.Thread(target=self.background_thread, name="WorkThread", daemon=False, kwargs=kwargs)
self.thread = threading.Thread(target=self.background_thread, name="SequentialThreadingRunner", daemon=False, kwargs=kwargs)
self.thread.start()
def next(self):
@ -87,7 +148,6 @@ class TestSafeEyesCore:
with self.condvar_in:
self.condvar_in.notify()
def wait(self):
# wait until done:
with self.condvar_out:
success = self.condvar_out.wait(1)
@ -102,22 +162,9 @@ class TestSafeEyesCore:
if self.thread:
self.thread.join(1)
handle = Handle(time_machine=time_machine)
monkeypatch.setattr(
core.utility,
"start_thread",
handle.utility_start_thread
)
monkeypatch.setattr(
core.time,
"sleep",
lambda time: handle.sleep(time)
)
yield handle
yield Handle
if handle:
handle.stop()
def test_create_empty(self):
@ -190,12 +237,13 @@ class TestSafeEyesCore:
safe_eyes_core.initialize(config)
sequential_threading_handle = sequential_threading(safe_eyes_core)
safe_eyes_core.start()
# start __scheduler_job
sequential_threading.next()
# FIXME: sleep is needed so code reaches the waiting_condition
sleep(0.1)
sequential_threading_handle.next()
assert context['state'] == model.State.WAITING
on_update_next_break.assert_called_once()
@ -203,32 +251,21 @@ class TestSafeEyesCore:
assert on_update_next_break.call_args[0][0].name == "translated!: break 1"
on_update_next_break.reset_mock()
with safe_eyes_core.lock:
time_machine.shift(delta=datetime.timedelta(minutes=15))
with safe_eyes_core.waiting_condition:
logging.debug("notify")
safe_eyes_core.waiting_condition.notify_all()
logging.debug("wait for end of __scheduler_job")
sequential_threading.wait()
logging.debug("done waiting for end of __scheduler_job")
# wait for end of __scheduler_job - we cannot stop while waiting on the condvar
# this just moves us into waiting for __wait_until_prepare to start
sequential_threading_handle.next()
safe_eyes_core.stop()
assert context['state'] == model.State.STOPPED
logging.debug("done")
def run_next_break(
self,
sequential_threading,
sequential_threading_handle,
time_machine,
safe_eyes_core,
context,
break_duration,
break_interval,
pre_break_warning_time,
break_name_translated
):
"""Run one entire cycle of safe_eyes_core.
@ -250,9 +287,10 @@ class TestSafeEyesCore:
safe_eyes_core.on_count_down += on_count_down
# start __scheduler_job
sequential_threading.next()
# FIXME: sleep is needed so code reaches the waiting_condition
sleep(0.1)
sequential_threading_handle.next()
# wait until it reaches the condvar
assert context['state'] == model.State.WAITING
on_update_next_break.assert_called_once()
@ -260,16 +298,9 @@ class TestSafeEyesCore:
assert on_update_next_break.call_args[0][0].name == break_name_translated
on_update_next_break.reset_mock()
with safe_eyes_core.lock:
time_machine.shift(delta=datetime.timedelta(minutes=break_interval))
with safe_eyes_core.waiting_condition:
logging.debug("notify")
safe_eyes_core.waiting_condition.notify_all()
logging.debug("wait for end of __scheduler_job")
sequential_threading.wait()
logging.debug("done waiting for end of __scheduler_job")
# continue after condvar
sequential_threading_handle.next()
# end of __scheduler_job
assert context['state'] == model.State.PRE_BREAK
@ -279,27 +310,18 @@ class TestSafeEyesCore:
on_pre_break.reset_mock()
# start __wait_until_prepare
sequential_threading.next()
sequential_threading_handle.next()
# FIXME: sleep is needed so code reaches the waiting_condition
sleep(0.1)
with safe_eyes_core.lock:
time_machine.shift(delta=datetime.timedelta(seconds=pre_break_warning_time))
with safe_eyes_core.waiting_condition:
logging.debug("notify")
safe_eyes_core.waiting_condition.notify_all()
logging.debug("wait for end of __wait_until_prepare")
sequential_threading.wait()
logging.debug("done waiting for end of __wait_until_prepare")
# wait until it reaches the condvar
# continue after condvar
sequential_threading_handle.next()
# end of __wait_until_prepare
# start __start_break
sequential_threading.next()
sequential_threading.wait()
sequential_threading_handle.next()
# first sleep in __start_break
sequential_threading.next()
sequential_threading_handle.next()
on_start_break.assert_called_once()
assert isinstance(on_start_break.call_args[0][0], model.Break)
@ -314,13 +336,11 @@ class TestSafeEyesCore:
assert context['state'] == model.State.BREAK
# continue sleep in __start_break
for i in range(break_duration - 1):
sequential_threading.wait()
sequential_threading.next()
for i in range(break_duration - 2):
sequential_threading_handle.next()
logging.debug("wait for end of __start_break")
sequential_threading.wait()
logging.debug("done waiting for end of __start_break")
sequential_threading_handle.next()
# end of __start_break
on_count_down.assert_called()
assert on_count_down.call_count == break_duration
@ -333,7 +353,7 @@ class TestSafeEyesCore:
string += "+00:00"
assert datetime.datetime.now(datetime.timezone.utc) == datetime.datetime.fromisoformat(string)
def test_actual(self, sequential_threading, time_machine):
def test_full_run_with_defaults(self, sequential_threading, time_machine):
context = {
"session": {},
}
@ -367,87 +387,86 @@ class TestSafeEyesCore:
safe_eyes_core = core.SafeEyesCore(context)
sequential_threading_handle = sequential_threading(safe_eyes_core)
safe_eyes_core.initialize(config)
safe_eyes_core.start()
self.run_next_break(
sequential_threading,
sequential_threading_handle,
time_machine,
safe_eyes_core,
context,
short_break_duration,
short_break_interval,
pre_break_warning_time,
"translated!: break 1"
)
# Time passed: 15min 25s
# 15min short_break_interval, 10 seconds pre_break_warning_time, 15 seconds short_break_duration
self.assert_datetime("2024-08-25T13:15:25")
self.run_next_break(
sequential_threading,
sequential_threading_handle,
time_machine,
safe_eyes_core,
context,
short_break_duration,
short_break_interval,
pre_break_warning_time,
"translated!: break 2"
)
self.assert_datetime("2024-08-25T13:30:50")
self.run_next_break(
sequential_threading,
sequential_threading_handle,
time_machine,
safe_eyes_core,
context,
short_break_duration,
short_break_interval,
pre_break_warning_time,
"translated!: break 3"
)
self.assert_datetime("2024-08-25T13:46:15")
self.run_next_break(
sequential_threading,
sequential_threading_handle,
time_machine,
safe_eyes_core,
context,
short_break_duration,
short_break_interval,
pre_break_warning_time,
"translated!: break 4"
)
self.assert_datetime("2024-08-25T14:01:40")
self.run_next_break(
sequential_threading,
sequential_threading_handle,
time_machine,
safe_eyes_core,
context,
long_break_duration,
long_break_interval,
pre_break_warning_time,
"translated!: long break 1"
)
#self.assert_datetime("2024-08-25T14:16:40")
# Time passed: 16min 10s
# 15min short_break_interval (from previous, as long_break_interval must be multiple)
# 10 seconds pre_break_warning_time, 1 minute long_break_duration
self.assert_datetime("2024-08-25T14:17:50")
self.run_next_break(
sequential_threading,
sequential_threading_handle,
time_machine,
safe_eyes_core,
context,
short_break_duration,
short_break_interval,
pre_break_warning_time,
"translated!: break 1"
)
# Time passed: 15min 25s
# 15min short_break_interval, 10 seconds pre_break_warning_time, 15 seconds short_break_duration
self.assert_datetime("2024-08-25T14:33:15")
safe_eyes_core.stop()
assert context['state'] == model.State.STOPPED