From 63f17d2e4c735767bfcb2d4fdb77d8c9ad8d9265 Mon Sep 17 00:00:00 2001 From: Alexandre Flament Date: Mon, 22 Feb 2021 18:13:50 +0100 Subject: [PATCH] [enh] autocomplete refactoring, autocomplete on external bangs --- searx/autocomplete.py | 96 ++-------- searx/query.py | 404 +++++++++++++++++++++++++++------------ searx/webapp.py | 39 ++-- tests/unit/test_query.py | 178 +++++++++++++++++ 4 files changed, 496 insertions(+), 221 deletions(-) diff --git a/searx/autocomplete.py b/searx/autocomplete.py index fbe634a5..75992a1d 100644 --- a/searx/autocomplete.py +++ b/searx/autocomplete.py @@ -20,97 +20,20 @@ from lxml import etree from json import loads from urllib.parse import urlencode +from requests import RequestException + from searx import settings -from searx.languages import language_codes -from searx.engines import ( - categories, engines, engine_shortcuts -) from searx.poolrequests import get as http_get +from searx.exceptions import SearxEngineResponseException def get(*args, **kwargs): if 'timeout' not in kwargs: kwargs['timeout'] = settings['outgoing']['request_timeout'] - + kwargs['raise_for_httperror'] = True return http_get(*args, **kwargs) -def searx_bang(full_query): - '''check if the searchQuery contain a bang, and create fitting autocompleter results''' - # check if there is a query which can be parsed - if len(full_query.getQuery()) == 0: - return [] - - results = [] - - # check if current query stats with !bang - first_char = full_query.getQuery()[0] - if first_char == '!' or first_char == '?': - if len(full_query.getQuery()) == 1: - # show some example queries - # TODO, check if engine is not avaliable - results.append(first_char + "images") - results.append(first_char + "wikipedia") - results.append(first_char + "osm") - else: - engine_query = full_query.getQuery()[1:] - - # check if query starts with categorie name - for categorie in categories: - if categorie.startswith(engine_query): - results.append(first_char + '{categorie}'.format(categorie=categorie)) - - # check if query starts with engine name - for engine in engines: - if engine.startswith(engine_query.replace('_', ' ')): - results.append(first_char + '{engine}'.format(engine=engine.replace(' ', '_'))) - - # check if query starts with engine shortcut - for engine_shortcut in engine_shortcuts: - if engine_shortcut.startswith(engine_query): - results.append(first_char + '{engine_shortcut}'.format(engine_shortcut=engine_shortcut)) - - # check if current query stats with :bang - elif first_char == ':': - if len(full_query.getQuery()) == 1: - # show some example queries - results.append(":en") - results.append(":en_us") - results.append(":english") - results.append(":united_kingdom") - else: - engine_query = full_query.getQuery()[1:] - - for lc in language_codes: - lang_id, lang_name, country, english_name = map(str.lower, lc) - - # check if query starts with language-id - if lang_id.startswith(engine_query): - if len(engine_query) <= 2: - results.append(':{lang_id}'.format(lang_id=lang_id.split('-')[0])) - else: - results.append(':{lang_id}'.format(lang_id=lang_id)) - - # check if query starts with language name - if lang_name.startswith(engine_query) or english_name.startswith(engine_query): - results.append(':{lang_name}'.format(lang_name=lang_name)) - - # check if query starts with country - if country.startswith(engine_query.replace('_', ' ')): - results.append(':{country}'.format(country=country.replace(' ', '_'))) - - # remove duplicates - result_set = set(results) - - # remove results which are already contained in the query - for query_part in full_query.query_parts: - if query_part in result_set: - result_set.remove(query_part) - - # convert result_set back to list - return list(result_set) - - def dbpedia(query, lang): # dbpedia autocompleter, no HTTPS autocomplete_url = 'https://lookup.dbpedia.org/api/search.asmx/KeywordSearch?' @@ -204,3 +127,14 @@ backends = {'dbpedia': dbpedia, 'qwant': qwant, 'wikipedia': wikipedia } + + +def search_autocomplete(backend_name, query, lang): + backend = backends.get(backend_name) + if backend is None: + return [] + + try: + return backend(query, lang) + except (RequestException, SearxEngineResponseException): + return [] diff --git a/searx/query.py b/searx/query.py index 38cb03ff..2e6a2aa4 100644 --- a/searx/query.py +++ b/searx/query.py @@ -1,162 +1,330 @@ -#!/usr/bin/env python - -''' -searx is free software: you can redistribute it and/or modify -it under the terms of the GNU Affero General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -searx is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Affero General Public License for more details. - -You should have received a copy of the GNU Affero General Public License -along with searx. If not, see < http://www.gnu.org/licenses/ >. - -(C) 2014 by Thomas Pointhuber, -''' +# SPDX-License-Identifier: AGPL-3.0-or-later +from abc import abstractmethod, ABC import re from searx.languages import language_codes from searx.engines import categories, engines, engine_shortcuts +from searx.external_bang import get_bang_definition_and_autocomplete from searx.search import EngineRef from searx.webutils import VALID_LANGUAGE_CODE +class QueryPartParser(ABC): + + __slots__ = "raw_text_query", "enable_autocomplete" + + @staticmethod + @abstractmethod + def check(raw_value): + """Check if raw_value can be parsed""" + + def __init__(self, raw_text_query, enable_autocomplete): + self.raw_text_query = raw_text_query + self.enable_autocomplete = enable_autocomplete + + @abstractmethod + def __call__(self, raw_value): + """Try to parse raw_value: set the self.raw_text_query properties + + return True if raw_value has been parsed + + self.raw_text_query.autocomplete_list is also modified + if self.enable_autocomplete is True + """ + + def _add_autocomplete(self, value): + if value not in self.raw_text_query.autocomplete_list: + self.raw_text_query.autocomplete_list.append(value) + + +class TimeoutParser(QueryPartParser): + + @staticmethod + def check(raw_value): + return raw_value[0] == '<' + + def __call__(self, raw_value): + value = raw_value[1:] + found = self._parse(value) if len(value) > 0 else False + if self.enable_autocomplete and not value: + self._autocomplete() + return found + + def _parse(self, value): + if not value.isdigit(): + return False + raw_timeout_limit = int(value) + if raw_timeout_limit < 100: + # below 100, the unit is the second ( <3 = 3 seconds timeout ) + self.raw_text_query.timeout_limit = float(raw_timeout_limit) + else: + # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout ) + self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0 + return True + + def _autocomplete(self): + for suggestion in ['<3', '<850']: + self._add_autocomplete(suggestion) + + +class LanguageParser(QueryPartParser): + + @staticmethod + def check(raw_value): + return raw_value[0] == ':' + + def __call__(self, raw_value): + value = raw_value[1:].lower().replace('_', '-') + found = self._parse(value) if len(value) > 0 else False + if self.enable_autocomplete and not found: + self._autocomplete(value) + return found + + def _parse(self, value): + found = False + # check if any language-code is equal with + # declared language-codes + for lc in language_codes: + lang_id, lang_name, country, english_name = map(str.lower, lc) + + # if correct language-code is found + # set it as new search-language + + if (value == lang_id + or value == lang_name + or value == english_name + or value.replace('-', ' ') == country)\ + and value not in self.raw_text_query.languages: + found = True + lang_parts = lang_id.split('-') + if len(lang_parts) == 2: + self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper()) + else: + self.raw_text_query.languages.append(lang_id) + # to ensure best match (first match is not necessarily the best one) + if value == lang_id: + break + + # user may set a valid, yet not selectable language + if VALID_LANGUAGE_CODE.match(value): + lang_parts = value.split('-') + if len(lang_parts) > 1: + value = lang_parts[0].lower() + '-' + lang_parts[1].upper() + if value not in self.raw_text_query.languages: + self.raw_text_query.languages.append(value) + found = True + + return found + + def _autocomplete(self, value): + if not value: + # show some example queries + for lang in [":en", ":en_us", ":english", ":united_kingdom"]: + self.raw_text_query.autocomplete_list.append(lang) + return + + for lc in language_codes: + lang_id, lang_name, country, english_name = map(str.lower, lc) + + # check if query starts with language-id + if lang_id.startswith(value): + if len(value) <= 2: + self._add_autocomplete(':' + lang_id.split('-')[0]) + else: + self._add_autocomplete(':' + lang_id) + + # check if query starts with language name + if lang_name.startswith(value) or english_name.startswith(value): + self._add_autocomplete(':' + lang_name) + + # check if query starts with country + # here "new_zealand" is "new-zealand" (see __call__) + if country.startswith(value.replace('-', ' ')): + self._add_autocomplete(':' + country.replace(' ', '_')) + + +class ExternalBangParser(QueryPartParser): + + @staticmethod + def check(raw_value): + return raw_value.startswith('!!') + + def __call__(self, raw_value): + value = raw_value[2:] + found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, []) + if self.enable_autocomplete: + self._autocomplete(bang_ac_list) + return found + + def _parse(self, value): + found = False + bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value) + if bang_definition is not None: + self.raw_text_query.external_bang = value + found = True + return found, bang_ac_list + + def _autocomplete(self, bang_ac_list): + if not bang_ac_list: + bang_ac_list = ['g', 'ddg', 'bing'] + for external_bang in bang_ac_list: + self._add_autocomplete('!!' + external_bang) + + +class BangParser(QueryPartParser): + + @staticmethod + def check(raw_value): + return raw_value[0] == '!' or raw_value[0] == '?' + + def __call__(self, raw_value): + value = raw_value[1:].replace('-', ' ').replace('_', ' ') + found = self._parse(value) if len(value) > 0 else False + if found and raw_value[0] == '!': + self.raw_text_query.specific = True + if self.enable_autocomplete: + self._autocomplete(raw_value[0], value) + return found + + def _parse(self, value): + # check if prefix is equal with engine shortcut + if value in engine_shortcuts: + value = engine_shortcuts[value] + + # check if prefix is equal with engine name + if value in engines: + self.raw_text_query.enginerefs.append(EngineRef(value, 'none')) + return True + + # check if prefix is equal with categorie name + if value in categories: + # using all engines for that search, which + # are declared under that categorie name + self.raw_text_query.enginerefs.extend(EngineRef(engine.name, value) + for engine in categories[value] + if (engine.name, value) not in self.raw_text_query.disabled_engines) + return True + + return False + + def _autocomplete(self, first_char, value): + if not value: + # show some example queries + for suggestion in ['images', 'wikipedia', 'osm']: + if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories: + self._add_autocomplete(first_char + suggestion) + return + + # check if query starts with categorie name + for category in categories: + if category.startswith(value): + self._add_autocomplete(first_char + category) + + # check if query starts with engine name + for engine in engines: + if engine.startswith(value): + self._add_autocomplete(first_char + engine.replace(' ', '_')) + + # check if query starts with engine shortcut + for engine_shortcut in engine_shortcuts: + if engine_shortcut.startswith(value): + self._add_autocomplete(first_char + engine_shortcut) + + class RawTextQuery: """parse raw text query (the value from the html input)""" + PARSER_CLASSES = [ + TimeoutParser, # this force the timeout + LanguageParser, # this force a language + ExternalBangParser, # external bang (must be before BangParser) + BangParser # this force a engine or category + ] + def __init__(self, query, disabled_engines): assert isinstance(query, str) + # input parameters self.query = query - self.disabled_engines = [] - - if disabled_engines: - self.disabled_engines = disabled_engines - - self.query_parts = [] - self.user_query_parts = [] + self.disabled_engines = disabled_engines if disabled_engines else [] + # parsed values self.enginerefs = [] self.languages = [] self.timeout_limit = None self.external_bang = None self.specific = False + self.autocomplete_list = [] + # internal properties + self.query_parts = [] # use self.getFullQuery() + self.user_query_parts = [] # use self.getQuery() + self.autocomplete_location = None self._parse_query() - # parse query, if tags are set, which - # change the search engine or search-language def _parse_query(self): - self.query_parts = [] + """ + parse self.query, if tags are set, which + change the search engine or search-language + """ # split query, including whitespaces raw_query_parts = re.split(r'(\s+)', self.query) - for query_part in raw_query_parts: - searx_query_part = False + last_index_location = None + autocomplete_index = len(raw_query_parts) - 1 + for i, query_part in enumerate(raw_query_parts): # part does only contain spaces, skip if query_part.isspace()\ or query_part == '': continue - # this force the timeout - if query_part[0] == '<': - try: - raw_timeout_limit = int(query_part[1:]) - if raw_timeout_limit < 100: - # below 100, the unit is the second ( <3 = 3 seconds timeout ) - self.timeout_limit = float(raw_timeout_limit) - else: - # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout ) - self.timeout_limit = raw_timeout_limit / 1000.0 - searx_query_part = True - except ValueError: - # error not reported to the user - pass - - # this force a language - if query_part[0] == ':' and len(query_part) > 1: - lang = query_part[1:].lower().replace('_', '-') - - # check if any language-code is equal with - # declared language-codes - for lc in language_codes: - lang_id, lang_name, country, english_name = map(str.lower, lc) - - # if correct language-code is found - # set it as new search-language - if (lang == lang_id - or lang == lang_name - or lang == english_name - or lang.replace('-', ' ') == country)\ - and lang not in self.languages: - searx_query_part = True - lang_parts = lang_id.split('-') - if len(lang_parts) == 2: - self.languages.append(lang_parts[0] + '-' + lang_parts[1].upper()) - else: - self.languages.append(lang_id) - # to ensure best match (first match is not necessarily the best one) - if lang == lang_id: - break - - # user may set a valid, yet not selectable language - if VALID_LANGUAGE_CODE.match(lang): - lang_parts = lang.split('-') - if len(lang_parts) > 1: - lang = lang_parts[0].lower() + '-' + lang_parts[1].upper() - if lang not in self.languages: - self.languages.append(lang) - searx_query_part = True - - # external bang - if query_part[0:2] == "!!": - self.external_bang = query_part[2:] - searx_query_part = True - continue - # this force a engine or category - if query_part[0] == '!' or query_part[0] == '?': - prefix = query_part[1:].replace('-', ' ').replace('_', ' ') - - # check if prefix is equal with engine shortcut - if prefix in engine_shortcuts: - searx_query_part = True - engine_name = engine_shortcuts[prefix] - if engine_name in engines: - self.enginerefs.append(EngineRef(engine_name, 'none')) - - # check if prefix is equal with engine name - elif prefix in engines: - searx_query_part = True - self.enginerefs.append(EngineRef(prefix, 'none')) - - # check if prefix is equal with categorie name - elif prefix in categories: - # using all engines for that search, which - # are declared under that categorie name - searx_query_part = True - self.enginerefs.extend(EngineRef(engine.name, prefix) - for engine in categories[prefix] - if (engine.name, prefix) not in self.disabled_engines) - - if query_part[0] == '!': - self.specific = True + # parse special commands + special_part = False + for parser_class in RawTextQuery.PARSER_CLASSES: + if parser_class.check(query_part): + special_part = parser_class(self, i == autocomplete_index)(query_part) + break # append query part to query_part list - if searx_query_part: - self.query_parts.append(query_part) - else: - self.user_query_parts.append(query_part) + qlist = self.query_parts if special_part else self.user_query_parts + qlist.append(query_part) + last_index_location = (qlist, len(qlist) - 1) + + self.autocomplete_location = last_index_location + + def get_autocomplete_full_query(self, text): + qlist, position = self.autocomplete_location + qlist[position] = text + return self.getFullQuery() def changeQuery(self, query): self.user_query_parts = query.strip().split() + self.query = self.getFullQuery() + self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1) + self.autocomplete_list = [] return self def getQuery(self): return ' '.join(self.user_query_parts) def getFullQuery(self): - # get full querry including whitespaces - return '{0} {1}'.format(''.join(self.query_parts), self.getQuery()).strip() + """ + get full querry including whitespaces + """ + return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip() + + def __str__(self): + return self.getFullQuery() + + def __repr__(self): + return f"<{self.__class__.__name__} " \ + + f"query={self.query!r} " \ + + f"disabled_engines={self.disabled_engines!r}\n " \ + + f"languages={self.languages!r} " \ + + f"timeout_limit={self.timeout_limit!r} "\ + + f"external_bang={self.external_bang!r} " \ + + f"specific={self.specific!r} " \ + + f"enginerefs={self.enginerefs!r}\n " \ + + f"autocomplete_list={self.autocomplete_list!r}\n " \ + + f"query_parts={self.query_parts!r}\n " \ + + f"user_query_parts={self.user_query_parts!r} >" diff --git a/searx/webapp.py b/searx/webapp.py index 4752eb27..f1034c27 100755 --- a/searx/webapp.py +++ b/searx/webapp.py @@ -74,12 +74,13 @@ from searx.languages import language_codes as languages from searx.search import SearchWithPlugins, initialize as search_initialize from searx.search.checker import get_result as checker_get_result from searx.query import RawTextQuery -from searx.autocomplete import searx_bang, backends as autocomplete_backends +from searx.autocomplete import search_autocomplete, backends as autocomplete_backends from searx.plugins import plugins from searx.plugins.oa_doi_rewrite import get_doi_resolver from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES from searx.answerers import answerers from searx.poolrequests import get_global_proxies +from searx.answerers import ask from searx.metrology.error_recorder import errors_per_engines # serve pages with HTTP/1.1 @@ -763,27 +764,18 @@ def about(): def autocompleter(): """Return autocompleter results""" + # run autocompleter + results = [] + # set blocked engines disabled_engines = request.preferences.engines.get_disabled() # parse query raw_text_query = RawTextQuery(request.form.get('q', ''), disabled_engines) - # check if search query is set - if not raw_text_query.getQuery(): - return '', 400 - - # run autocompleter - completer = autocomplete_backends.get(request.preferences.get_value('autocomplete')) - - # parse searx specific autocompleter results like !bang - raw_results = searx_bang(raw_text_query) - # normal autocompletion results only appear if no inner results returned - # and there is a query part besides the engine and language bangs - if len(raw_results) == 0 and completer and (len(raw_text_query.query_parts) > 1 or - (len(raw_text_query.languages) == 0 and - not raw_text_query.specific)): + # and there is a query part + if len(raw_text_query.autocomplete_list) == 0 and len(raw_text_query.getQuery()) > 0: # get language from cookie language = request.preferences.get_value('language') if not language or language == 'all': @@ -791,15 +783,18 @@ def autocompleter(): else: language = language.split('-')[0] # run autocompletion - raw_results.extend(completer(raw_text_query.getQuery(), language)) + raw_results = search_autocomplete(request.preferences.get_value('autocomplete'), + raw_text_query.getQuery(), language) + for result in raw_results: + results.append(raw_text_query.changeQuery(result).getFullQuery()) - # parse results (write :language and !engine back to result string) - results = [] - for result in raw_results: - raw_text_query.changeQuery(result) + if len(raw_text_query.autocomplete_list) > 0: + for autocomplete_text in raw_text_query.autocomplete_list: + results.append(raw_text_query.get_autocomplete_full_query(autocomplete_text)) - # add parsed result - results.append(raw_text_query.getFullQuery()) + for answers in ask(raw_text_query): + for answer in answers: + results.append(str(answer['answer'])) # return autocompleter results if request.headers.get('X-Requested-With') == 'XMLHttpRequest': diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index 0176f735..6bbfdb1e 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -1,7 +1,20 @@ +from searx.search import initialize from searx.query import RawTextQuery from searx.testing import SearxTestCase +TEST_ENGINES = [ + { + 'name': 'dummy engine', + 'engine': 'dummy', + 'categories': 'general', + 'shortcut': 'du', + 'timeout': 3.0, + 'tokens': [], + }, +] + + class TestQuery(SearxTestCase): def test_simple_query(self): @@ -14,6 +27,37 @@ class TestQuery(SearxTestCase): self.assertEqual(len(query.languages), 0) self.assertFalse(query.specific) + def test_multiple_spaces_query(self): + query_text = '\tthe query' + query = RawTextQuery(query_text, []) + + self.assertEqual(query.getFullQuery(), 'the query') + self.assertEqual(len(query.query_parts), 0) + self.assertEqual(len(query.user_query_parts), 2) + self.assertEqual(len(query.languages), 0) + self.assertFalse(query.specific) + + def test_str_method(self): + query_text = '<7 the query' + query = RawTextQuery(query_text, []) + self.assertEqual(str(query), '<7 the query') + + def test_repr_method(self): + query_text = '<8 the query' + query = RawTextQuery(query_text, []) + r = repr(query) + self.assertTrue(r.startswith(f"