|mod] checker: improve searx-checker command line

* output is unbuffered
* verbose mode describe more precisly the errrors
This commit is contained in:
Alexandre Flament 2021-01-08 19:04:04 +01:00
parent 3a9f513521
commit 45bfab77d0
2 changed files with 67 additions and 33 deletions

View File

@ -1,8 +1,10 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import sys
import io
import os
import argparse
import logging
import searx.search
import searx.search.checker
@ -10,6 +12,14 @@ from searx.search import processors
from searx.engines import engine_shortcuts
# configure logging
root = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
for h in root.handlers:
root.removeHandler(h)
root.addHandler(handler)
# color only for a valid terminal
if sys.stdout.isatty() and os.environ.get('TERM') not in ['dumb', 'unknown']:
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
@ -21,7 +31,12 @@ else:
BOLD_SEQ = ""
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = "", "", "", "", "", "", "", ""
# equivalent of 'python -u' (unbuffered stdout, stderr)
stdout = io.TextIOWrapper(open(sys.stdout.fileno(), 'wb', 0), write_through=True)
stderr = io.TextIOWrapper(open(sys.stderr.fileno(), 'wb', 0), write_through=True)
# iterator of processors
def iter_processor(engine_name_list):
if len(engine_name_list) > 0:
for name in engine_name_list:
@ -30,38 +45,49 @@ def iter_processor(engine_name_list):
if processor is not None:
yield name, processor
else:
print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, RED, ' Not found ', RESET_SEQ)
stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RED}Engine does not exist{RESET_SEQ}')
else:
for name, processor in searx.search.processors.items():
yield name, processor
def run(engine_name_list):
# actual check & display
def run(engine_name_list, verbose):
searx.search.initialize()
broken_urls = []
for name, processor in iter_processor(engine_name_list):
if sys.stdout.isatty():
print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, WHITE, ' Checking', RESET_SEQ)
stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}Checking\n')
if not sys.stdout.isatty():
stderr.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}Checking\n')
checker = searx.search.checker.Checker(processor)
checker.run()
if checker.test_results.succesfull:
print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, GREEN, ' OK', RESET_SEQ)
stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{GREEN}OK{RESET_SEQ}\n')
if verbose:
stdout.write(f' {"found languages":15}: {" ".join(sorted(list(checker.test_results.languages)))}\n')
else:
errors = [test_name + ': ' + error for test_name, error in checker.test_results]
print(BOLD_SEQ, 'Engine ', '%-30s' % name, RESET_SEQ, RED, ' Error ', str(errors), RESET_SEQ)
broken_urls += checker.test_results.broken_urls
for url in broken_urls:
print('Error fetching', url)
stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RESET_SEQ}{RED}Error{RESET_SEQ}')
if not verbose:
errors = [test_name + ': ' + error for test_name, error in checker.test_results]
stdout.write(f'{RED}Error {str(errors)}{RESET_SEQ}\n')
else:
stdout.write('\n')
stdout.write(f' {"found languages":15}: {" ".join(sorted(list(checker.test_results.languages)))}\n')
for test_name, logs in checker.test_results.logs.items():
for log in logs:
stdout.write(f' {test_name:15}: {RED}{" ".join(log)}{RESET_SEQ}\n')
# call by setup.py
def main():
parser = argparse.ArgumentParser(description='Check searx engines.')
parser.add_argument('engine_name_list', metavar='engine name', type=str, nargs='*',
help='engines name or shortcut list. Empty for all engines.')
parser.add_argument('--verbose', '-v',
action='store_true', dest='verbose',
help='Display details about the test results',
default=False)
args = parser.parse_args()
run(args.engine_name_list)
run(args.engine_name_list, args.verbose)
if __name__ == '__main__':

View File

@ -17,6 +17,8 @@ from searx.search.models import SearchQuery, EngineRef
from searx.search.processors import EngineProcessor
logger = logger.getChild('searx.search.checker')
HTML_TAGS = [
'embed', 'iframe', 'object', 'param', 'picture', 'source', 'svg', 'math', 'canvas', 'noscript', 'script',
'del', 'ins', 'area', 'audio', 'img', 'map', 'track', 'video', 'a', 'abbr', 'b', 'bdi', 'bdo', 'br', 'cite',
@ -121,20 +123,25 @@ def _search_query_diff(sq1: SearchQuery, sq2: SearchQuery)\
class TestResults:
__slots__ = 'errors', 'broken_urls'
__slots__ = 'errors', 'logs', 'languages'
def __init__(self):
self.errors: typing.Dict[str, typing.List[str]] = {}
self.broken_urls = []
self.logs: typing.Dict[str, typing.List[typing.Any]] = {}
self.languages: typing.Set[str] = set()
def add_error(self, test, message):
def add_error(self, test, message, *args):
# message to self.errors
errors_for_test = self.errors.setdefault(test, [])
if message not in errors_for_test:
errors_for_test.append(message)
# (message, *args) to self.logs
logs_for_test = self.logs.setdefault(test, [])
if (message, *args) not in logs_for_test:
logs_for_test.append((message, *args))
def add_broken_url(self, url):
if url not in self.broken_urls:
self.broken_urls.append(url)
def add_language(self, language):
self.languages.add(language)
@property
def succesfull(self):
@ -167,20 +174,23 @@ class ResultContainerTests:
results = self.result_container.get_ordered_results()
return [result['url'] for result in results]
def _record_error(self, message: str) -> None:
self.test_results.add_error(self.test_name, message)
def _record_error(self, message: str, *args) -> None:
sq = _search_query_to_dict(self.search_query)
sqstr = ' '.join(['{}={!r}'.format(k, v) for k, v in sq.items()])
self.test_results.add_error(self.test_name, message, *args, '(' + sqstr + ')')
def _add_language(self, text: str) -> typing.Optional[str]:
r = cld3.get_language(str(text)) # pylint: disable=E1101
if r is not None and r.probability >= 0.9 and r.is_reliable:
if r is not None and r.probability >= 0.98 and r.is_reliable:
self.languages.add(r.language)
self.test_results.add_language(r.language)
return None
def _check_result(self, result):
if not _check_no_html(result.get('title', '')):
self._record_error('HTML in title')
self._record_error('HTML in title', repr(result.get('title', '')))
if not _check_no_html(result.get('content', '')):
self._record_error('HTML in content')
self._record_error('HTML in content', repr(result.get('content', '')))
self._add_language(result.get('title', ''))
self._add_language(result.get('content', ''))
@ -198,13 +208,11 @@ class ResultContainerTests:
thumbnail_src = result.get('thumbnail_src')
if thumbnail_src is not None:
if not _is_url_image(thumbnail_src):
self.test_results.add_broken_url(thumbnail_src)
self._record_error('thumbnail_src URL is invalid')
self._record_error('thumbnail_src URL is invalid', thumbnail_src)
elif not _is_url_image(result.get('img_src')):
self.test_results.add_broken_url(result.get('img_src'))
self._record_error('img_src URL is invalid')
self._record_error('img_src URL is invalid', result.get('img_src'))
if template == 'videos.html' and not _is_url_image(result.get('thumbnail')):
self._record_error('thumbnail URL is invalid')
self._record_error('thumbnail URL is invalid', result.get('img_src'))
def _check_results(self, results: list):
for result in results:
@ -213,16 +221,16 @@ class ResultContainerTests:
def _check_answers(self, answers):
for answer in answers:
if not _check_no_html(answer):
self._record_error('HTML in answer')
self._record_error('HTML in answer', answer)
def _check_infoboxes(self, infoboxes):
for infobox in infoboxes:
if not _check_no_html(infobox.get('content', '')):
self._record_error('HTML in infobox content')
self._record_error('HTML in infobox content', infobox.get('content', ''))
self._add_language(infobox.get('content', ''))
for attribute in infobox.get('attributes', {}):
if not _check_no_html(attribute.get('value', '')):
self._record_error('HTML in infobox attribute value')
self._record_error('HTML in infobox attribute value', attribute.get('value', ''))
def check_basic(self):
if len(self.result_container.unresponsive_engines) > 0: