diff --git a/searx/search/checker/__main__.py b/searx/search/checker/__main__.py index 37b7e6cd..75b37e6c 100644 --- a/searx/search/checker/__main__.py +++ b/searx/search/checker/__main__.py @@ -1,8 +1,10 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import sys +import io import os import argparse +import logging import searx.search import searx.search.checker @@ -10,6 +12,14 @@ from searx.search import processors from searx.engines import engine_shortcuts +# configure logging +root = logging.getLogger() +handler = logging.StreamHandler(sys.stdout) +for h in root.handlers: + root.removeHandler(h) +root.addHandler(handler) + +# color only for a valid terminal if sys.stdout.isatty() and os.environ.get('TERM') not in ['dumb', 'unknown']: RESET_SEQ = "\033[0m" COLOR_SEQ = "\033[1;%dm" @@ -21,7 +31,12 @@ else: BOLD_SEQ = "" BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = "", "", "", "", "", "", "", "" +# equivalent of 'python -u' (unbuffered stdout, stderr) +stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True) +stderr = io.TextIOWrapper(open(sys.stderr.fileno(), 'wb', 0), write_through=True) + +# iterator of processors def iter_processor(engine_name_list): if len(engine_name_list) > 0: for name in engine_name_list: @@ -30,38 +45,49 @@ def iter_processor(engine_name_list): if processor is not None: yield name, processor else: - print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, RED, ' Not found ', RESET_SEQ) + stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RED}Engine does not exist{RESET_SEQ}') else: for name, processor in searx.search.processors.items(): yield name, processor -def run(engine_name_list): +# actual check & display +def run(engine_name_list, verbose): searx.search.initialize() - broken_urls = [] for name, processor in iter_processor(engine_name_list): - if sys.stdout.isatty(): - print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, WHITE, ' Checking', RESET_SEQ) + stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}Checking\n') + if not sys.stdout.isatty(): + stderr.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}Checking\n') checker = searx.search.checker.Checker(processor) checker.run() if checker.test_results.succesfull: - print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, GREEN, ' OK', RESET_SEQ) + stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{GREEN}OK{RESET_SEQ}\n') + if verbose: + stdout.write(f' {"found languages":15}: {" ".join(sorted(list(checker.test_results.languages)))}\n') else: - errors = [test_name + ': ' + error for test_name, error in checker.test_results] - print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, RED, ' Error ', str(errors), RESET_SEQ) - - broken_urls += checker.test_results.broken_urls - - for url in broken_urls: - print('Error fetching', url) + stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RESET_SEQ}{RED}Error{RESET_SEQ}') + if not verbose: + errors = [test_name + ': ' + error for test_name, error in checker.test_results] + stdout.write(f'{RED}Error {str(errors)}{RESET_SEQ}\n') + else: + stdout.write('\n') + stdout.write(f' {"found languages":15}: {" ".join(sorted(list(checker.test_results.languages)))}\n') + for test_name, logs in checker.test_results.logs.items(): + for log in logs: + stdout.write(f' {test_name:15}: {RED}{" ".join(log)}{RESET_SEQ}\n') +# call by setup.py def main(): parser = argparse.ArgumentParser(description='Check searx engines.') parser.add_argument('engine_name_list', metavar='engine name', type=str, nargs='*', help='engines name or shortcut list. Empty for all engines.') + parser.add_argument('--verbose', '-v', + action='store_true', dest='verbose', + help='Display details about the test results', + default=False) args = parser.parse_args() - run(args.engine_name_list) + run(args.engine_name_list, args.verbose) if __name__ == '__main__': diff --git a/searx/search/checker/impl.py b/searx/search/checker/impl.py index abef5f8e..71a941f7 100644 --- a/searx/search/checker/impl.py +++ b/searx/search/checker/impl.py @@ -17,6 +17,8 @@ from searx.search.models import SearchQuery, EngineRef from searx.search.processors import EngineProcessor +logger = logger.getChild('searx.search.checker') + HTML_TAGS = [ 'embed', 'iframe', 'object', 'param', 'picture', 'source', 'svg', 'math', 'canvas', 'noscript', 'script', 'del', 'ins', 'area', 'audio', 'img', 'map', 'track', 'video', 'a', 'abbr', 'b', 'bdi', 'bdo', 'br', 'cite', @@ -121,20 +123,25 @@ def _search_query_diff(sq1: SearchQuery, sq2: SearchQuery)\ class TestResults: - __slots__ = 'errors', 'broken_urls' + __slots__ = 'errors', 'logs', 'languages' def __init__(self): self.errors: typing.Dict[str, typing.List[str]] = {} - self.broken_urls = [] + self.logs: typing.Dict[str, typing.List[typing.Any]] = {} + self.languages: typing.Set[str] = set() - def add_error(self, test, message): + def add_error(self, test, message, *args): + # message to self.errors errors_for_test = self.errors.setdefault(test, []) if message not in errors_for_test: errors_for_test.append(message) + # (message, *args) to self.logs + logs_for_test = self.logs.setdefault(test, []) + if (message, *args) not in logs_for_test: + logs_for_test.append((message, *args)) - def add_broken_url(self, url): - if url not in self.broken_urls: - self.broken_urls.append(url) + def add_language(self, language): + self.languages.add(language) @property def succesfull(self): @@ -167,20 +174,23 @@ class ResultContainerTests: results = self.result_container.get_ordered_results() return [result['url'] for result in results] - def _record_error(self, message: str) -> None: - self.test_results.add_error(self.test_name, message) + def _record_error(self, message: str, *args) -> None: + sq = _search_query_to_dict(self.search_query) + sqstr = ' '.join(['{}={!r}'.format(k, v) for k, v in sq.items()]) + self.test_results.add_error(self.test_name, message, *args, '(' + sqstr + ')') def _add_language(self, text: str) -> typing.Optional[str]: r = cld3.get_language(str(text)) # pylint: disable=E1101 - if r is not None and r.probability >= 0.9 and r.is_reliable: + if r is not None and r.probability >= 0.98 and r.is_reliable: self.languages.add(r.language) + self.test_results.add_language(r.language) return None def _check_result(self, result): if not _check_no_html(result.get('title', '')): - self._record_error('HTML in title') + self._record_error('HTML in title', repr(result.get('title', ''))) if not _check_no_html(result.get('content', '')): - self._record_error('HTML in content') + self._record_error('HTML in content', repr(result.get('content', ''))) self._add_language(result.get('title', '')) self._add_language(result.get('content', '')) @@ -198,13 +208,11 @@ class ResultContainerTests: thumbnail_src = result.get('thumbnail_src') if thumbnail_src is not None: if not _is_url_image(thumbnail_src): - self.test_results.add_broken_url(thumbnail_src) - self._record_error('thumbnail_src URL is invalid') + self._record_error('thumbnail_src URL is invalid', thumbnail_src) elif not _is_url_image(result.get('img_src')): - self.test_results.add_broken_url(result.get('img_src')) - self._record_error('img_src URL is invalid') + self._record_error('img_src URL is invalid', result.get('img_src')) if template == 'videos.html' and not _is_url_image(result.get('thumbnail')): - self._record_error('thumbnail URL is invalid') + self._record_error('thumbnail URL is invalid', result.get('img_src')) def _check_results(self, results: list): for result in results: @@ -213,16 +221,16 @@ class ResultContainerTests: def _check_answers(self, answers): for answer in answers: if not _check_no_html(answer): - self._record_error('HTML in answer') + self._record_error('HTML in answer', answer) def _check_infoboxes(self, infoboxes): for infobox in infoboxes: if not _check_no_html(infobox.get('content', '')): - self._record_error('HTML in infobox content') + self._record_error('HTML in infobox content', infobox.get('content', '')) self._add_language(infobox.get('content', '')) for attribute in infobox.get('attributes', {}): if not _check_no_html(attribute.get('value', '')): - self._record_error('HTML in infobox attribute value') + self._record_error('HTML in infobox attribute value', attribute.get('value', '')) def check_basic(self): if len(self.result_container.unresponsive_engines) > 0: