From 4841ba2166107b14f2206ea4ae0cf7c68c62be1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=85ke=20Forslund?= Date: Sat, 6 Mar 2021 11:30:05 +0100 Subject: [PATCH 1/2] Replace multiprocessing with concurrent.futures Multiprocessing pools can't safely be used in a threaded context in Python 3.9+. This replaces the multiprocessing Pool with a ProcessPoolExecutor from concurrent.futures. Handle shutdown implicitly --- padatious/training_manager.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/padatious/training_manager.py b/padatious/training_manager.py index 3bef33c..61f2edd 100644 --- a/padatious/training_manager.py +++ b/padatious/training_manager.py @@ -11,9 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing as mp +from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import TimeoutError from functools import partial -from multiprocessing.context import TimeoutError from os.path import join, isfile, isdir, splitext import padatious @@ -95,15 +95,18 @@ def train(self, debug=True, single_thread=False, timeout=20): train(i) else: # Train in multiple processes to disk - pool = mp.Pool() + pool = ProcessPoolExecutor() try: - pool.map_async(train, self.objects_to_train).get(timeout) + _ = list(pool.map(train, + self.objects_to_train, + timeout=timeout)) + except TimeoutError: if debug: print('Some objects timed out while training') finally: - pool.close() - pool.join() + pass + # No explicit shutdown, let the it complete in the background. # Load saved objects from disk for obj in self.objects_to_train: From f56b2944d1d470e28b089e814cdaab388619926e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=85ke=20Forslund?= Date: Sat, 3 Apr 2021 11:15:37 +0200 Subject: [PATCH 2/2] Add proper subprocess timeout --- padatious/intent_container.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/padatious/intent_container.py b/padatious/intent_container.py index b020bf7..8a5544d 100644 --- a/padatious/intent_container.py +++ b/padatious/intent_container.py @@ -18,7 +18,7 @@ import padaos import sys from functools import wraps -from subprocess import call, check_output +from subprocess import call, check_output, TimeoutExpired from threading import Thread from padatious.match_data import MatchData @@ -261,12 +261,16 @@ def train_subprocess(self, *args, **kwargs): Returns: bool: True for success, False if timed out """ - ret = call([ - sys.executable, '-m', 'padatious', 'train', self.cache_dir, - '-d', json.dumps(self.serialized_args), - '-a', json.dumps(args), - '-k', json.dumps(kwargs), - ]) + try: + ret = call([ + sys.executable, '-m', 'padatious', 'train', self.cache_dir, + '-d', json.dumps(self.serialized_args), + '-a', json.dumps(args), + '-k', json.dumps(kwargs), + ], timeout=kwargs.get('timeout')) + except TimeoutExpired: + ret = 10 # Treat process timeout as internal timeout + if ret == 2: raise TypeError( 'Invalid train arguments: {} {}'.format(