Fix issues in network after updating httpx to 0.21.x (#3169)

* [mod] upgrade httpx 0.21.2

httpx 0.21.2 and httpcore 0.14.4 fix multiple issues:
* https://github.com/encode/httpx/releases/tag/0.21.2
* https://github.com/encode/httpcore/releases/tag/0.14.4

so most of the workarounds in searx.network have been removed.

* pick even more changes from searxng

Co-authored-by: Alexandre Flament <alex@al-f.net>
This commit is contained in:
Noémi Ványi 2022-02-28 22:05:20 +01:00 committed by GitHub
parent 0248777f95
commit 0669bfd7a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 230 additions and 227 deletions

View File

@ -283,26 +283,3 @@ def load_engines(engine_list):
if engine is not None: if engine is not None:
engines[engine.name] = engine engines[engine.name] = engine
return engines return engines
def initialize_engines(engine_list):
load_engines(engine_list)
initialize_network(engine_list, settings['outgoing'])
def engine_init(engine_name, init_fn):
try:
set_context_network_name(engine_name)
init_fn(get_engine_from_settings(engine_name))
except SearxEngineResponseException as exc:
logger.warn('%s engine: Fail to initialize // %s', engine_name, exc)
except Exception:
logger.exception('%s engine: Fail to initialize', engine_name)
else:
logger.debug('%s engine: Initialized', engine_name)
for engine_name, engine in engines.items():
if hasattr(engine, 'init'):
init_fn = getattr(engine, 'init')
if init_fn:
logger.debug('%s engine: Starting background initialization', engine_name)
threading.Thread(target=engine_init, args=(engine_name, init_fn)).start()

View File

@ -5,7 +5,6 @@ import logging
import threading import threading
import uvloop import uvloop
import httpcore
import httpx import httpx
from httpx_socks import AsyncProxyTransport from httpx_socks import AsyncProxyTransport
from python_socks import ( from python_socks import (
@ -27,17 +26,6 @@ TRANSPORT_KWARGS = {
} }
async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpx._models.URL):
logger.debug('Drop connections for %r', url.host)
connections_to_close = [conn for conn in connection_pool._pool if conn._origin == url.host]
for connection in connections_to_close:
connection_pool._pool.remove(connection)
try:
await connection.aclose()
except httpx.NetworkError as e:
logger.warning('Error closing an existing connection', exc_info=e)
def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False): def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
global SSLCONTEXTS global SSLCONTEXTS
key = (proxy_url, cert, verify, trust_env, http2) key = (proxy_url, cert, verify, trust_env, http2)
@ -49,74 +37,25 @@ def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http
class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport): class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
"""Block HTTP request""" """Block HTTP request"""
async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None): async def handle_async_request(self, request):
raise httpx.UnsupportedProtocol("HTTP protocol is disabled") raise httpx.UnsupportedProtocol('HTTP protocol is disabled')
class AsyncProxyTransportFixed(AsyncProxyTransport): class AsyncProxyTransportFixed(AsyncProxyTransport):
"""Fix httpx_socks.AsyncProxyTransport """Fix httpx_socks.AsyncProxyTransport
Map python_socks exceptions to httpx.ProxyError Map python_socks exceptions to httpx.ProxyError exceptions
Map socket.gaierror to httpx.ConnectError
Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
* self._keepalive_sweep()
* self._response_closed(self, connection)
Note: AsyncProxyTransport inherit from AsyncConnectionPool
""" """
async def handle_async_request(self, request: httpx.Request): async def handle_async_request(self, request):
retry = 2 try:
while retry > 0: return await super().handle_async_request(request)
retry -= 1 except ProxyConnectionError as e:
try: raise httpx.ProxyError("ProxyConnectionError: " + e.strerror, request=request) from e
return await super().handle_async_request(request) except ProxyTimeoutError as e:
except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e: raise httpx.ProxyError("ProxyTimeoutError: " + e.args[0], request=request) from e
raise httpx.ProxyError(e) except ProxyError as e:
except OSError as e: raise httpx.ProxyError("ProxyError: " + e.args[0], request=request) from e
# socket.gaierror when DNS resolution fails
raise httpx.NetworkError(e)
except httpx.RemoteProtocolError as e:
# in case of httpx.RemoteProtocolError: Server disconnected
await close_connections_for_url(self, request.url)
logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpx.NetworkError, httpx.ProtocolError) as e:
# httpx.WriteError on HTTP/2 connection leaves a new opened stream
# then each new request creates a new stream and raise the same WriteError
await close_connections_for_url(self, request.url)
raise e
class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
"""Fix httpx.AsyncHTTPTransport"""
async def handle_async_request(self, request: httpx.Request):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().handle_async_request(request)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpx.ConnectError(e)
except httpx.CloseError as e:
# httpx.CloseError: [Errno 104] Connection reset by peer
# raised by _keepalive_sweep()
# from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
await close_connections_for_url(self._pool, request.url)
logger.warning('httpx.CloseError: retry', exc_info=e)
# retry
except httpx.RemoteProtocolError as e:
# in case of httpx.RemoteProtocolError: Server disconnected
await close_connections_for_url(self._pool, request.url)
logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpx.ProtocolError, httpx.NetworkError) as e:
await close_connections_for_url(self._pool, request.url)
raise e
def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries): def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
@ -132,29 +71,35 @@ def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url) proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port, return AsyncProxyTransportFixed(
username=proxy_username, password=proxy_password, proxy_type=proxy_type,
rdns=rdns, proxy_host=proxy_host,
loop=get_loop(), proxy_port=proxy_port,
verify=verify, username=proxy_username,
http2=http2, password=proxy_password,
local_address=local_address, rdns=rdns,
max_connections=limit.max_connections, loop=get_loop(),
max_keepalive_connections=limit.max_keepalive_connections, verify=verify,
keepalive_expiry=limit.keepalive_expiry, http2=http2,
retries=retries, local_address=local_address,
**TRANSPORT_KWARGS) limits=limit,
retries=retries,
**TRANSPORT_KWARGS,
)
def get_transport(verify, http2, local_address, proxy_url, limit, retries): def get_transport(verify, http2, local_address, proxy_url, limit, retries):
verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
return AsyncHTTPTransportFixed(verify=verify, return httpx.AsyncHTTPTransport(
http2=http2, # pylint: disable=protected-access
local_address=local_address, verify=verify,
proxy=httpx._config.Proxy(proxy_url) if proxy_url else None, http2=http2,
limits=limit, limits=limit,
retries=retries, proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
**TRANSPORT_KWARGS) local_address=local_address,
retries=retries,
**TRANSPORT_KWARGS,
)
def iter_proxies(proxies): def iter_proxies(proxies):
@ -168,7 +113,7 @@ def iter_proxies(proxies):
def new_client(enable_http, verify, enable_http2, def new_client(enable_http, verify, enable_http2,
max_connections, max_keepalive_connections, keepalive_expiry, max_connections, max_keepalive_connections, keepalive_expiry,
proxies, local_address, retries, max_redirects): proxies, local_address, retries, max_redirects, hook_log_response):
limit = httpx.Limits(max_connections=max_connections, limit = httpx.Limits(max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections, max_keepalive_connections=max_keepalive_connections,
keepalive_expiry=keepalive_expiry) keepalive_expiry=keepalive_expiry)
@ -189,7 +134,10 @@ def new_client(enable_http, verify, enable_http2,
mounts['http://'] = AsyncHTTPTransportNoHttp() mounts['http://'] = AsyncHTTPTransportNoHttp()
transport = get_transport(verify, enable_http2, local_address, None, limit, retries) transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects) event_hooks = None
if hook_log_response:
event_hooks = {'response': [hook_log_response]}
return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects, event_hooks=event_hooks)
def get_loop(): def get_loop():

View File

@ -1,17 +1,23 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
# lint: pylint
# pylint: disable=global-statement
# pylint: disable=missing-module-docstring, missing-class-docstring
import atexit import atexit
import asyncio import asyncio
import ipaddress import ipaddress
from itertools import cycle from itertools import cycle
from typing import Dict
import httpx import httpx
from searx import logger, searx_debug
from .client import new_client, get_loop, AsyncHTTPTransportNoHttp from .client import new_client, get_loop, AsyncHTTPTransportNoHttp
logger = logger.getChild('network')
DEFAULT_NAME = '__DEFAULT__' DEFAULT_NAME = '__DEFAULT__'
NETWORKS = {} NETWORKS: Dict[str, 'Network'] = {}
# requests compatibility when reading proxy settings from settings.yml # requests compatibility when reading proxy settings from settings.yml
PROXY_PATTERN_MAPPING = { PROXY_PATTERN_MAPPING = {
'http': 'http://', 'http': 'http://',
@ -26,34 +32,50 @@ PROXY_PATTERN_MAPPING = {
'socks5h:': 'socks5h://', 'socks5h:': 'socks5h://',
} }
ADDRESS_MAPPING = { ADDRESS_MAPPING = {'ipv4': '0.0.0.0', 'ipv6': '::'}
'ipv4': '0.0.0.0',
'ipv6': '::'
}
class Network: class Network:
__slots__ = ('enable_http', 'verify', 'enable_http2', __slots__ = (
'max_connections', 'max_keepalive_connections', 'keepalive_expiry', 'enable_http',
'local_addresses', 'proxies', 'using_tor_proxy', 'max_redirects', 'retries', 'retry_on_http_error', 'verify',
'_local_addresses_cycle', '_proxies_cycle', '_clients') 'enable_http2',
'max_connections',
'max_keepalive_connections',
'keepalive_expiry',
'local_addresses',
'proxies',
'using_tor_proxy',
'max_redirects',
'retries',
'retry_on_http_error',
'_local_addresses_cycle',
'_proxies_cycle',
'_clients',
'_logger',
)
_TOR_CHECK_RESULT = {} _TOR_CHECK_RESULT = {}
def __init__(self, def __init__(
enable_http=True, # pylint: disable=too-many-arguments
verify=True, self,
enable_http2=False, enable_http=True,
max_connections=None, verify=True,
max_keepalive_connections=None, enable_http2=False,
keepalive_expiry=None, max_connections=None,
proxies=None, max_keepalive_connections=None,
using_tor_proxy=False, keepalive_expiry=None,
local_addresses=None, proxies=None,
retries=0, using_tor_proxy=False,
retry_on_http_error=None, local_addresses=None,
max_redirects=30): retries=0,
retry_on_http_error=None,
max_redirects=30,
logger_name=None,
):
self.enable_http = enable_http self.enable_http = enable_http
self.verify = verify self.verify = verify
self.enable_http2 = enable_http2 self.enable_http2 = enable_http2
@ -69,6 +91,7 @@ class Network:
self._local_addresses_cycle = self.get_ipaddress_cycle() self._local_addresses_cycle = self.get_ipaddress_cycle()
self._proxies_cycle = self.get_proxy_cycles() self._proxies_cycle = self.get_proxy_cycles()
self._clients = {} self._clients = {}
self._logger = logger.getChild(logger_name) if logger_name else logger
self.check_parameters() self.check_parameters()
def check_parameters(self): def check_parameters(self):
@ -85,7 +108,7 @@ class Network:
local_addresses = self.local_addresses local_addresses = self.local_addresses
if not local_addresses: if not local_addresses:
return return
elif isinstance(local_addresses, str): if isinstance(local_addresses, str):
local_addresses = [local_addresses] local_addresses = [local_addresses]
for address in local_addresses: for address in local_addresses:
yield address yield address
@ -123,8 +146,17 @@ class Network:
for pattern, proxy_urls in self.iter_proxies(): for pattern, proxy_urls in self.iter_proxies():
proxy_settings[pattern] = cycle(proxy_urls) proxy_settings[pattern] = cycle(proxy_urls)
while True: while True:
# pylint: disable=stop-iteration-return
yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items()) yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
async def log_response(self, response: httpx.Response):
request = response.request
status = f"{response.status_code} {response.reason_phrase}"
response_line = f"{response.http_version} {status}"
content_type = response.headers.get("Content-Type")
content_type = f' ({content_type})' if content_type else ''
self._logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}')
@staticmethod @staticmethod
async def check_tor_proxy(client: httpx.AsyncClient, proxies) -> bool: async def check_tor_proxy(client: httpx.AsyncClient, proxies) -> bool:
if proxies in Network._TOR_CHECK_RESULT: if proxies in Network._TOR_CHECK_RESULT:
@ -135,13 +167,13 @@ class Network:
for transport in client._mounts.values(): # pylint: disable=protected-access for transport in client._mounts.values(): # pylint: disable=protected-access
if isinstance(transport, AsyncHTTPTransportNoHttp): if isinstance(transport, AsyncHTTPTransportNoHttp):
continue continue
if not getattr(transport, '_rdns', False): if not getattr(transport, '_pool') and getattr(transport._pool, '_rdns', False):
result = False
break
else:
response = await client.get('https://check.torproject.org/api/ip')
if not response.json()['IsTor']:
result = False result = False
continue
return False
response = await client.get("https://check.torproject.org/api/ip", timeout=10)
if not response.json()["IsTor"]:
result = False
Network._TOR_CHECK_RESULT[proxies] = result Network._TOR_CHECK_RESULT[proxies] = result
return result return result
@ -151,6 +183,7 @@ class Network:
local_address = next(self._local_addresses_cycle) local_address = next(self._local_addresses_cycle)
proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
key = (verify, max_redirects, local_address, proxies) key = (verify, max_redirects, local_address, proxies)
hook_log_response = self.log_response if searx_debug else None
if key not in self._clients or self._clients[key].is_closed: if key not in self._clients or self._clients[key].is_closed:
client = new_client( client = new_client(
self.enable_http, self.enable_http,
@ -162,7 +195,8 @@ class Network:
dict(proxies), dict(proxies),
local_address, local_address,
0, 0,
max_redirects max_redirects,
hook_log_response,
) )
if self.using_tor_proxy and not await self.check_tor_proxy(client, proxies): if self.using_tor_proxy and not await self.check_tor_proxy(client, proxies):
await client.aclose() await client.aclose()
@ -176,51 +210,64 @@ class Network:
await client.aclose() await client.aclose()
except httpx.HTTPError: except httpx.HTTPError:
pass pass
await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False) await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False)
@staticmethod @staticmethod
def get_kwargs_clients(kwargs): def extract_kwargs_clients(kwargs):
kwargs_clients = {} kwargs_clients = {}
if 'verify' in kwargs: if 'verify' in kwargs:
kwargs_clients['verify'] = kwargs.pop('verify') kwargs_clients['verify'] = kwargs.pop('verify')
if 'max_redirects' in kwargs: if 'max_redirects' in kwargs:
kwargs_clients['max_redirects'] = kwargs.pop('max_redirects') kwargs_clients['max_redirects'] = kwargs.pop('max_redirects')
if 'allow_redirects' in kwargs:
# see https://github.com/encode/httpx/pull/1808
kwargs['follow_redirects'] = kwargs.pop('allow_redirects')
return kwargs_clients return kwargs_clients
def is_valid_respones(self, response): def is_valid_response(self, response):
if (self.retry_on_http_error is True and 400 <= response.status_code <= 599) \ # pylint: disable=too-many-boolean-expressions
or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error) \ if (
or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error): (self.retry_on_http_error is True and 400 <= response.status_code <= 599)
or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error)
or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error)
):
return False return False
return True return True
async def request(self, method, url, **kwargs): async def call_client(self, stream, method, url, **kwargs):
retries = self.retries retries = self.retries
was_disconnected = False
kwargs_clients = Network.extract_kwargs_clients(kwargs)
while retries >= 0: # pragma: no cover while retries >= 0: # pragma: no cover
kwargs_clients = Network.get_kwargs_clients(kwargs)
client = await self.get_client(**kwargs_clients) client = await self.get_client(**kwargs_clients)
try: try:
response = await client.request(method, url, **kwargs) if stream:
if self.is_valid_respones(response) or retries <= 0: response = client.stream(method, url, **kwargs)
else:
response = await client.request(method, url, **kwargs)
if self.is_valid_response(response) or retries <= 0:
return response return response
except httpx.RemoteProtocolError as e:
if not was_disconnected:
# the server has closed the connection:
# try again without decreasing the retries variable & with a new HTTP client
was_disconnected = True
await client.aclose()
self._logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying')
continue
if retries <= 0:
raise e
except (httpx.RequestError, httpx.HTTPStatusError) as e: except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0: if retries <= 0:
raise e raise e
retries -= 1 retries -= 1
async def request(self, method, url, **kwargs):
return await self.call_client(False, method, url, **kwargs)
async def stream(self, method, url, **kwargs): async def stream(self, method, url, **kwargs):
retries = self.retries return await self.call_client(True, method, url, **kwargs)
while retries >= 0: # pragma: no cover
kwargs_clients = Network.get_kwargs_clients(kwargs)
client = await self.get_client(**kwargs_clients)
try:
response = client.stream(method, url, **kwargs)
if self.is_valid_respones(response) or retries <= 0:
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
@classmethod @classmethod
async def aclose_all(cls): async def aclose_all(cls):
@ -228,8 +275,7 @@ class Network:
def get_network(name=None): def get_network(name=None):
global NETWORKS return NETWORKS.get(name or DEFAULT_NAME)
return NETWORKS[name or DEFAULT_NAME]
def check_network_configuration(): def check_network_configuration():
@ -240,8 +286,10 @@ def check_network_configuration():
try: try:
await network.get_client() await network.get_client()
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
network._logger.exception('Error') # pylint: disable=protected-access
exception_count += 1 exception_count += 1
return exception_count return exception_count
future = asyncio.run_coroutine_threadsafe(check(), get_loop()) future = asyncio.run_coroutine_threadsafe(check(), get_loop())
exception_count = future.result() exception_count = future.result()
if exception_count > 0: if exception_count > 0:
@ -249,42 +297,39 @@ def check_network_configuration():
def initialize(settings_engines=None, settings_outgoing=None): def initialize(settings_engines=None, settings_outgoing=None):
# pylint: disable=import-outside-toplevel)
from searx.engines import engines from searx.engines import engines
from searx import settings from searx import settings
global NETWORKS # pylint: enable=import-outside-toplevel)
settings_engines = settings_engines or settings.get('engines') settings_engines = settings_engines or settings['engines']
settings_outgoing = settings_outgoing or settings.get('outgoing') settings_outgoing = settings_outgoing or settings['outgoing']
# default parameters for AsyncHTTPTransport # default parameters for AsyncHTTPTransport
# see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # noqa # see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # nopep8
default_params = { default_params = {
'enable_http': False, 'enable_http': False,
'verify': True, 'verify': True,
'enable_http2': settings_outgoing.get('enable_http2', True), 'enable_http2': settings_outgoing.get('enable_http2', True),
# Magic number kept from previous code
'max_connections': settings_outgoing.get('pool_connections', 100), 'max_connections': settings_outgoing.get('pool_connections', 100),
# Picked from constructor
'max_keepalive_connections': settings_outgoing.get('pool_maxsize', 10), 'max_keepalive_connections': settings_outgoing.get('pool_maxsize', 10),
#
'keepalive_expiry': settings_outgoing.get('keepalive_expiry', 5.0), 'keepalive_expiry': settings_outgoing.get('keepalive_expiry', 5.0),
'local_addresses': settings_outgoing.get('source_ips'), 'local_addresses': settings_outgoing.get('source_ips', []),
'proxies': settings_outgoing.get('proxies'), 'using_tor_proxy': settings_outgoing.get('using_tor_proxy', False),
'using_tor_proxy': settings_outgoing.get('using_tor_proxy'), 'proxies': settings_outgoing.get('proxies', None),
# default maximum redirect
# from https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
'max_redirects': settings_outgoing.get('max_redirects', 30), 'max_redirects': settings_outgoing.get('max_redirects', 30),
#
'retries': settings_outgoing.get('retries', 0), 'retries': settings_outgoing.get('retries', 0),
'retry_on_http_error': None, 'retry_on_http_error': None,
} }
def new_network(params): def new_network(params, logger_name=None):
nonlocal default_params nonlocal default_params
result = {} result = {}
result.update(default_params) result.update(default_params)
result.update(params) result.update(params)
if logger_name:
result['logger_name'] = logger_name
return Network(**result) return Network(**result)
def iter_networks(): def iter_networks():
@ -300,13 +345,13 @@ def initialize(settings_engines=None, settings_outgoing=None):
if NETWORKS: if NETWORKS:
done() done()
NETWORKS.clear() NETWORKS.clear()
NETWORKS[DEFAULT_NAME] = new_network({}) NETWORKS[DEFAULT_NAME] = new_network({}, logger_name='default')
NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}) NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4')
NETWORKS['ipv6'] = new_network({'local_addresses': '::'}) NETWORKS['ipv6'] = new_network({'local_addresses': '::'}, logger_name='ipv6')
# define networks from outgoing.networks # define networks from outgoing.networks
for network_name, network in settings_outgoing.get('networks', {}).items(): for network_name, network in settings_outgoing.get('networks', {}).items():
NETWORKS[network_name] = new_network(network) NETWORKS[network_name] = new_network(network, logger_name=network_name)
# define networks from engines.[i].network (except references) # define networks from engines.[i].network (except references)
for engine_name, engine, network in iter_networks(): for engine_name, engine, network in iter_networks():
@ -317,9 +362,9 @@ def initialize(settings_engines=None, settings_outgoing=None):
network[attribute_name] = getattr(engine, attribute_name) network[attribute_name] = getattr(engine, attribute_name)
else: else:
network[attribute_name] = attribute_value network[attribute_name] = attribute_value
NETWORKS[engine_name] = new_network(network) NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
elif isinstance(network, dict): elif isinstance(network, dict):
NETWORKS[engine_name] = new_network(network) NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
# define networks from engines.[i].network (references) # define networks from engines.[i].network (references)
for engine_name, engine, network in iter_networks(): for engine_name, engine, network in iter_networks():
@ -332,7 +377,7 @@ def initialize(settings_engines=None, settings_outgoing=None):
if 'image_proxy' not in NETWORKS: if 'image_proxy' not in NETWORKS:
image_proxy_params = default_params.copy() image_proxy_params = default_params.copy()
image_proxy_params['enable_http2'] = False image_proxy_params['enable_http2'] = False
NETWORKS['image_proxy'] = new_network(image_proxy_params) NETWORKS['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy')
@atexit.register @atexit.register

View File

@ -25,12 +25,13 @@ from _thread import start_new_thread
from searx import settings from searx import settings
from searx.answerers import ask from searx.answerers import ask
from searx.external_bang import get_bang_url from searx.external_bang import get_bang_url
from searx.engines import load_engines
from searx.results import ResultContainer from searx.results import ResultContainer
from searx import logger from searx import logger
from searx.plugins import plugins from searx.plugins import plugins
from searx.search.models import EngineRef, SearchQuery from searx.search.models import EngineRef, SearchQuery
from searx.search.processors import processors, initialize as initialize_processors from searx.search.processors import PROCESSORS, initialize as initialize_processors
from searx.network import check_network_configuration from searx.network import check_network_configuration, initialize as initialize_network
from searx.search.checker import initialize as initialize_checker from searx.search.checker import initialize as initialize_checker
@ -50,9 +51,12 @@ else:
def initialize(settings_engines=None, enable_checker=False, check_network=False): def initialize(settings_engines=None, enable_checker=False, check_network=False):
settings_engines = settings_engines or settings['engines'] settings_engines = settings_engines or settings['engines']
initialize_processors(settings_engines) load_engines(settings_engines)
initialize_network(settings_engines, settings['outgoing'])
if check_network: if check_network:
check_network_configuration() check_network_configuration()
initialize_processors(settings_engines)
if enable_checker: if enable_checker:
initialize_checker() initialize_checker()
@ -107,7 +111,7 @@ class Search:
# start search-reqest for all selected engines # start search-reqest for all selected engines
for engineref in self.search_query.engineref_list: for engineref in self.search_query.engineref_list:
processor = processors[engineref.name] processor = PROCESSORS[engineref.name]
# set default request parameters # set default request parameters
request_params = processor.get_params(self.search_query, engineref.category) request_params = processor.get_params(self.search_query, engineref.category)
@ -150,7 +154,7 @@ class Search:
for engine_name, query, request_params in requests: for engine_name, query, request_params in requests:
th = threading.Thread( th = threading.Thread(
target=processors[engine_name].search, target=PROCESSORS[engine_name].search,
args=(query, request_params, self.result_container, self.start_time, self.actual_timeout), args=(query, request_params, self.result_container, self.start_time, self.actual_timeout),
name=search_id, name=search_id,
) )

View File

@ -9,7 +9,7 @@ import signal
from searx import logger, settings, searx_debug from searx import logger, settings, searx_debug
from searx.exceptions import SearxSettingsException from searx.exceptions import SearxSettingsException
from searx.search.processors import processors from searx.search.processors import PROCESSORS
from searx.search.checker import Checker from searx.search.checker import Checker
from searx.shared import schedule, storage from searx.shared import schedule, storage
@ -55,7 +55,7 @@ def run():
'status': 'ok', 'status': 'ok',
'engines': {} 'engines': {}
} }
for name, processor in processors.items(): for name, processor in PROCESSORS.items():
logger.debug('Checking %s engine', name) logger.debug('Checking %s engine', name)
checker = Checker(processor) checker = Checker(processor)
checker.run() checker.run()

View File

@ -1,5 +1,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
import threading
from .online import OnlineProcessor from .online import OnlineProcessor
from .offline import OfflineProcessor from .offline import OfflineProcessor
from .online_dictionary import OnlineDictionaryProcessor from .online_dictionary import OnlineDictionaryProcessor
@ -10,9 +12,9 @@ import searx.engines as engines
__all__ = ['EngineProcessor', 'OfflineProcessor', 'OnlineProcessor', __all__ = ['EngineProcessor', 'OfflineProcessor', 'OnlineProcessor',
'OnlineDictionaryProcessor', 'OnlineCurrencyProcessor', 'processors'] 'OnlineDictionaryProcessor', 'OnlineCurrencyProcessor', 'PROCESSORS']
logger = logger.getChild('search.processors') logger = logger.getChild('search.processors')
processors = {} PROCESSORS = {}
def get_processor_class(engine_type): def get_processor_class(engine_type):
@ -27,15 +29,27 @@ def get_processor(engine, engine_name):
processor_class = get_processor_class(engine_type) processor_class = get_processor_class(engine_type)
if processor_class: if processor_class:
return processor_class(engine, engine_name) return processor_class(engine, engine_name)
else: return None
return None
def initialize_processor(processor):
"""Initialize one processor
Call the init function of the engine
"""
if processor.has_initialize_function:
t = threading.Thread(target=processor.initialize, daemon=True)
t.start()
def initialize(engine_list): def initialize(engine_list):
engines.initialize_engines(engine_list) """Initialize all engines and store a processor for each engine in :py:obj:`PROCESSORS`."""
for engine_name, engine in engines.engines.items(): for engine_data in engine_list:
processor = get_processor(engine, engine_name) engine_name = engine_data['name']
if processor is None: engine = engines.engines.get(engine_name)
logger.error('Error get processor for engine %s', engine_name) if engine:
else: processor = get_processor(engine, engine_name)
processors[engine_name] = processor initialize_processor(processor)
if processor is None:
engine.logger.error('Error get processor for engine %s', engine_name)
else:
PROCESSORS[engine_name] = processor

View File

@ -2,17 +2,32 @@
from abc import abstractmethod, ABC from abc import abstractmethod, ABC
from searx import logger from searx import logger
from searx.engines import engines
from searx.utils import get_engine_from_settings
logger = logger.getChild('searx.search.processor') logger = logger.getChild('searx.search.processor')
class EngineProcessor(ABC): class EngineProcessor(ABC):
def __init__(self, engine, engine_name): def __init__(self, engine, engine_name):
self.engine = engine self.engine = engine
self.engine_name = engine_name self.engine_name = engine_name
def initialize(self):
try:
self.engine.init(get_engine_from_settings(self.engine_name))
except SearxEngineResponseException as exc:
logger.warn('Fail to initialize %s // %s', self.engine_name, exc)
except Exception: # pylint: disable=broad-except
logger.exception('Fail to initialize %s', self.engine_name)
else:
logger.debug('Initialized %s', self.engine_name)
@property
def has_initialize_function(self):
return hasattr(self.engine, 'init')
def get_params(self, search_query, engine_category): def get_params(self, search_query, engine_category):
# if paging is not supported, skip # if paging is not supported, skip
if search_query.pageno > 1 and not self.engine.paging: if search_query.pageno > 1 and not self.engine.paging:

View File

@ -77,13 +77,15 @@ class TestNetwork(SearxTestCase):
'verify': True, 'verify': True,
'max_redirects': 5, 'max_redirects': 5,
'timeout': 2, 'timeout': 2,
'allow_redirects': True,
} }
kwargs_client = Network.get_kwargs_clients(kwargs) kwargs_client = Network.extract_kwargs_clients(kwargs)
self.assertEqual(len(kwargs_client), 2) self.assertEqual(len(kwargs_client), 2)
self.assertEqual(len(kwargs), 1) self.assertEqual(len(kwargs), 2)
self.assertEqual(kwargs['timeout'], 2) self.assertEqual(kwargs['timeout'], 2)
self.assertEqual(kwargs['follow_redirects'], True)
self.assertTrue(kwargs_client['verify']) self.assertTrue(kwargs_client['verify'])
self.assertEqual(kwargs_client['max_redirects'], 5) self.assertEqual(kwargs_client['max_redirects'], 5)

View File

@ -23,7 +23,7 @@ class TestEnginesInit(SearxTestCase):
engine_list = [{'engine': 'dummy', 'name': 'engine1', 'shortcut': 'e1', 'categories': 'general'}, engine_list = [{'engine': 'dummy', 'name': 'engine1', 'shortcut': 'e1', 'categories': 'general'},
{'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}] {'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}]
engines.initialize_engines(engine_list) engines.load_engines(engine_list)
self.assertEqual(len(engines.engines), 1) self.assertEqual(len(engines.engines), 1)
self.assertIn('engine1', engines.engines) self.assertIn('engine1', engines.engines)
self.assertNotIn('onions', engines.categories) self.assertNotIn('onions', engines.categories)
@ -35,7 +35,7 @@ class TestEnginesInit(SearxTestCase):
'timeout': 20.0, 'onion_url': 'http://engine1.onion'}, 'timeout': 20.0, 'onion_url': 'http://engine1.onion'},
{'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}] {'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}]
engines.initialize_engines(engine_list) engines.load_engines(engine_list)
self.assertEqual(len(engines.engines), 2) self.assertEqual(len(engines.engines), 2)
self.assertIn('engine1', engines.engines) self.assertIn('engine1', engines.engines)
self.assertIn('engine2', engines.engines) self.assertIn('engine2', engines.engines)

View File

@ -1,11 +1,9 @@
from mock import patch from searx import settings
from searx.engines import load_engines
from searx.search import initialize from searx.search import initialize
from searx.query import RawTextQuery from searx.query import RawTextQuery
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
import searx.engines
TEST_ENGINES = [ TEST_ENGINES = [
{ {
@ -281,10 +279,6 @@ class TestBang(SearxTestCase):
self.assertEqual(query.getQuery(), '!dum the query') self.assertEqual(query.getQuery(), '!dum the query')
def test_bang_autocomplete_empty(self): def test_bang_autocomplete_empty(self):
with patch.object(searx.engines, 'initialize_engines', searx.engines.load_engines): load_engines(settings['engines'])
initialize() query = RawTextQuery('the query !', [])
query = RawTextQuery('the query !', []) self.assertEqual(query.autocomplete_list, ['!images', '!wikipedia', '!osm'])
self.assertEqual(query.autocomplete_list, ['!images', '!wikipedia', '!osm'])
query = RawTextQuery('the query ?', ['osm'])
self.assertEqual(query.autocomplete_list, ['?images', '?wikipedia'])

View File

@ -3,16 +3,20 @@
import json import json
from urllib.parse import ParseResult from urllib.parse import ParseResult
from mock import Mock from mock import Mock
import searx.search.processors
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
from searx.search import Search from searx.search import Search
import searx.engines
class ViewsTestCase(SearxTestCase): class ViewsTestCase(SearxTestCase):
def setUp(self): def setUp(self):
# skip init function (no external HTTP request) # skip init function (no external HTTP request)
self.setattr4test(searx.engines, 'initialize_engines', searx.engines.load_engines) def dummy(*args, **kwargs):
pass
self.setattr4test(searx.search.processors, 'initialize_processor', dummy)
from searx import webapp # pylint disable=import-outside-toplevel from searx import webapp # pylint disable=import-outside-toplevel