[httpx] replace searx.poolrequests by searx.network

settings.yml:

* outgoing.networks:
   * can contains network definition
   * propertiers: enable_http, verify, http2, max_connections, max_keepalive_connections,
     keepalive_expiry, local_addresses, support_ipv4, support_ipv6, proxies, max_redirects, retries
   * retries: 0 by default, number of times searx retries to send the HTTP request (using different IP & proxy each time)
   * local_addresses can be "192.168.0.1/24" (it supports IPv6)
   * support_ipv4 & support_ipv6: both True by default
     see https://github.com/searx/searx/pull/1034
* each engine can define a "network" section:
   * either a full network description
   * either reference an existing network

* all HTTP requests of engine use the same HTTP configuration (it was not the case before, see proxy configuration in master)
This commit is contained in:
Alexandre Flament 2021-04-05 10:43:33 +02:00 committed by Noémi Ványi
parent 88a96baedc
commit 14fe1779b7
31 changed files with 1036 additions and 677 deletions

View File

@ -130,14 +130,12 @@ Global Settings
request_timeout : 2.0 # default timeout in seconds, can be override by engine request_timeout : 2.0 # default timeout in seconds, can be override by engine
# max_request_timeout: 10.0 # the maximum timeout in seconds # max_request_timeout: 10.0 # the maximum timeout in seconds
useragent_suffix : "" # informations like an email address to the administrator useragent_suffix : "" # informations like an email address to the administrator
pool_connections : 100 # Number of different hosts pool_connections : 100 # Maximum number of allowable connections, or None for no limits. The default is 100.
pool_maxsize : 10 # Number of simultaneous requests by host pool_maxsize : 10 # Number of allowable keep-alive connections, or None to always allow. The default is 10.
enable_http2: True # See https://www.python-httpx.org/http2/
# uncomment below section if you want to use a proxy # uncomment below section if you want to use a proxy
# proxies: # proxies:
# http: # all://:
# - http://proxy1:8080
# - http://proxy2:8080
# https:
# - http://proxy1:8080 # - http://proxy1:8080
# - http://proxy2:8080 # - http://proxy2:8080
# uncomment below section only if you have more than one network interface # uncomment below section only if you have more than one network interface
@ -145,6 +143,7 @@ Global Settings
# source_ips: # source_ips:
# - 1.1.1.1 # - 1.1.1.1
# - 1.1.1.2 # - 1.1.1.2
# - fe80::/126
``request_timeout`` : ``request_timeout`` :
@ -157,20 +156,46 @@ Global Settings
Suffix to the user-agent searx uses to send requests to others engines. If an Suffix to the user-agent searx uses to send requests to others engines. If an
engine wish to block you, a contact info here may be useful to avoid that. engine wish to block you, a contact info here may be useful to avoid that.
.. _requests proxies: https://requests.readthedocs.io/en/latest/user/advanced/#proxies ``keepalive_expiry``:
.. _PySocks: https://pypi.org/project/PySocks/ Number of seconds to keep a connection in the pool. By default 5.0 seconds.
.. _httpx proxies: https://www.python-httpx.org/advanced/#http-proxying
``proxies`` : ``proxies`` :
Define one or more proxies you wish to use, see `requests proxies`_. Define one or more proxies you wish to use, see `httpx proxies`_.
If there are more than one proxy for one protocol (http, https), If there are more than one proxy for one protocol (http, https),
requests to the engines are distributed in a round-robin fashion. requests to the engines are distributed in a round-robin fashion.
- Proxy: `see <https://2.python-requests.org/en/latest/user/advanced/#proxies>`__.
- SOCKS proxies are also supported: `see <https://2.python-requests.org/en/latest/user/advanced/#socks>`__
``source_ips`` : ``source_ips`` :
If you use multiple network interfaces, define from which IP the requests must If you use multiple network interfaces, define from which IP the requests must
be made. This parameter is ignored when ``proxies`` is set. be made. Example:
* ``0.0.0.0`` any local IPv4 address.
* ``::`` any local IPv6 address.
* ``192.168.0.1``
* ``[ 192.168.0.1, 192.168.0.2 ]`` these two specific IP addresses
* ``fe80::60a2:1691:e5a2:ee1f``
* ``fe80::60a2:1691:e5a2:ee1f/126`` all IP addresses in this network.
* ``[ 192.168.0.1, fe80::/126 ]``
``retries`` :
Number of retry in case of an HTTP error.
On each retry, searx uses an different proxy and source ip.
``retry_on_http_error`` :
Retry request on some HTTP status code.
Example:
* ``true`` : on HTTP status code between 400 and 599.
* ``403`` : on HTTP status code 403.
* ``[403, 429]``: on HTTP status code 403 and 429.
``enable_http2`` :
Enable by default. Set to ``False`` to disable HTTP/2.
``max_redirects`` :
30 by default. Maximum redirect before it is an error.
``locales:`` ``locales:``
@ -216,6 +241,13 @@ Engine settings
api_key : 'apikey' api_key : 'apikey'
disabled : True disabled : True
language : en_US language : en_US
#enable_http: False
#enable_http2: False
#retries: 1
#retry_on_http_error: True # or 403 or [404, 429]
#max_connections: 100
#max_keepalive_connections: 10
#keepalive_expiry: 5.0
#proxies: #proxies:
# http: # http:
# - http://proxy1:8080 # - http://proxy1:8080
@ -270,6 +302,12 @@ Engine settings
``display_error_messages`` : default ``True`` ``display_error_messages`` : default ``True``
When an engine returns an error, the message is displayed on the user interface. When an engine returns an error, the message is displayed on the user interface.
``network``: optional
Use the network configuration from another engine.
In addition, there are two default networks:
* ``ipv4`` set ``local_addresses`` to ``0.0.0.0`` (use only IPv4 local addresses)
* ``ipv6`` set ``local_addresses`` to ``::`` (use only IPv6 local addresses)
.. note:: .. note::
A few more options are possible, but they are pretty specific to some A few more options are possible, but they are pretty specific to some

View File

@ -16,3 +16,4 @@ sphinx-tabs==2.1.0
sphinxcontrib-programoutput==0.17 sphinxcontrib-programoutput==0.17
sphinx-autobuild==2021.3.14 sphinx-autobuild==2021.3.14
linuxdoc==20210324 linuxdoc==20210324
aiounittest==1.4.0

View File

@ -24,7 +24,7 @@ from httpx import HTTPError
from searx import settings from searx import settings
from searx.poolrequests import get as http_get from searx.network import get as http_get
from searx.exceptions import SearxEngineResponseException from searx.exceptions import SearxEngineResponseException

View File

@ -27,7 +27,7 @@ from searx import settings
from searx import logger from searx import logger
from searx.data import ENGINES_LANGUAGES from searx.data import ENGINES_LANGUAGES
from searx.exceptions import SearxEngineResponseException from searx.exceptions import SearxEngineResponseException
from searx.poolrequests import get, get_proxy_cycles from searx.network import get, initialize as initialize_network, set_context_network_name
from searx.utils import load_module, match_language, get_engine_from_settings, gen_useragent from searx.utils import load_module, match_language, get_engine_from_settings, gen_useragent
@ -89,8 +89,6 @@ def load_engine(engine_data):
engine.categories = [] engine.categories = []
else: else:
engine.categories = list(map(str.strip, param_value.split(','))) engine.categories = list(map(str.strip, param_value.split(',')))
elif param_name == 'proxies':
engine.proxies = get_proxy_cycles(param_value)
else: else:
setattr(engine, param_name, param_value) setattr(engine, param_name, param_value)
@ -289,9 +287,11 @@ def load_engines(engine_list):
def initialize_engines(engine_list): def initialize_engines(engine_list):
load_engines(engine_list) load_engines(engine_list)
initialize_network(engine_list, settings['outgoing'])
def engine_init(engine_name, init_fn): def engine_init(engine_name, init_fn):
try: try:
set_context_network_name(engine_name)
init_fn(get_engine_from_settings(engine_name)) init_fn(get_engine_from_settings(engine_name))
except SearxEngineResponseException as exc: except SearxEngineResponseException as exc:
logger.warn('%s engine: Fail to initialize // %s', engine_name, exc) logger.warn('%s engine: Fail to initialize // %s', engine_name, exc)

View File

@ -6,7 +6,7 @@
from lxml.html import fromstring from lxml.html import fromstring
from json import loads from json import loads
from searx.utils import extract_text, match_language, eval_xpath, dict_subset from searx.utils import extract_text, match_language, eval_xpath, dict_subset
from searx.poolrequests import get from searx.network import get
# about # about
about = { about = {

View File

@ -8,7 +8,7 @@ from urllib.parse import urlencode
from searx.exceptions import SearxEngineAPIException from searx.exceptions import SearxEngineAPIException
from searx.engines.duckduckgo import get_region_code from searx.engines.duckduckgo import get_region_code
from searx.engines.duckduckgo import _fetch_supported_languages, supported_languages_url # NOQA # pylint: disable=unused-import from searx.engines.duckduckgo import _fetch_supported_languages, supported_languages_url # NOQA # pylint: disable=unused-import
from searx.poolrequests import get from searx.network import get
# about # about
about = { about = {

View File

@ -8,7 +8,7 @@ import re
from json import loads from json import loads
from urllib.parse import urlencode from urllib.parse import urlencode
# from searx import logger # from searx import logger
from searx.poolrequests import get from searx.network import get
# about # about
about = { about = {

View File

@ -7,7 +7,7 @@ from flask_babel import gettext
from lxml import etree from lxml import etree
from datetime import datetime from datetime import datetime
from urllib.parse import urlencode from urllib.parse import urlencode
from searx.poolrequests import get from searx.network import get
# about # about
about = { about = {

View File

@ -8,7 +8,7 @@ from json import loads
from urllib.parse import urlencode from urllib.parse import urlencode
from searx.utils import html_to_text, match_language from searx.utils import html_to_text, match_language
from searx.exceptions import SearxEngineAPIException, SearxEngineCaptchaException from searx.exceptions import SearxEngineAPIException, SearxEngineCaptchaException
from searx.raise_for_httperror import raise_for_httperror from searx.network import raise_for_httperror
# about # about
about = { about = {

View File

@ -5,7 +5,7 @@
from urllib.parse import urlencode from urllib.parse import urlencode
from lxml import html from lxml import html
from searx.poolrequests import get from searx.network import get
from searx.exceptions import SearxEngineAccessDeniedException from searx.exceptions import SearxEngineAccessDeniedException
from searx.utils import ( from searx.utils import (
extract_text, extract_text,

View File

@ -9,7 +9,7 @@ from lxml import html
from dateutil import parser from dateutil import parser
from urllib.parse import quote_plus, urlencode from urllib.parse import quote_plus, urlencode
from searx import logger from searx import logger
from searx.poolrequests import get as http_get from searx.network import get as http_get
# about # about
about = { about = {

View File

@ -5,9 +5,10 @@
from json import loads from json import loads
from urllib.parse import urlencode from urllib.parse import urlencode
import searx.poolrequests as requests
import base64 import base64
from searx.network import post as http_post
# about # about
about = { about = {
"website": 'https://www.spotify.com', "website": 'https://www.spotify.com',
@ -38,7 +39,7 @@ def request(query, params):
params['url'] = search_url.format(query=urlencode({'q': query}), offset=offset) params['url'] = search_url.format(query=urlencode({'q': query}), offset=offset)
r = requests.post( r = http_post(
'https://accounts.spotify.com/api/token', 'https://accounts.spotify.com/api/token',
data={'grant_type': 'client_credentials'}, data={'grant_type': 'client_credentials'},
headers={'Authorization': 'Basic ' + base64.b64encode( headers={'Authorization': 'Basic ' + base64.b64encode(

View File

@ -12,7 +12,7 @@ from babel.dates import format_datetime, format_date, format_time, get_datetime_
from searx import logger from searx import logger
from searx.data import WIKIDATA_UNITS from searx.data import WIKIDATA_UNITS
from searx.poolrequests import post, get from searx.network import post, get
from searx.utils import match_language, searx_useragent, get_string_replaces_function from searx.utils import match_language, searx_useragent, get_string_replaces_function
from searx.external_urls import get_external_url, get_earth_coordinates_url, area_to_osm_zoom from searx.external_urls import get_external_url, get_earth_coordinates_url, area_to_osm_zoom
from searx.engines.wikipedia import _fetch_supported_languages, supported_languages_url # NOQA # pylint: disable=unused-import from searx.engines.wikipedia import _fetch_supported_languages, supported_languages_url # NOQA # pylint: disable=unused-import

View File

@ -7,7 +7,7 @@ from urllib.parse import quote
from json import loads from json import loads
from lxml.html import fromstring from lxml.html import fromstring
from searx.utils import match_language, searx_useragent from searx.utils import match_language, searx_useragent
from searx.raise_for_httperror import raise_for_httperror from searx.network import raise_for_httperror
# about # about
about = { about = {

View File

@ -7,7 +7,7 @@ from json import loads
from time import time from time import time
from urllib.parse import urlencode from urllib.parse import urlencode
from searx.poolrequests import get as http_get from searx.network import get as http_get
# about # about
about = { about = {

View File

@ -6,7 +6,7 @@
from lxml.html import fromstring from lxml.html import fromstring
from searx import logger from searx import logger
from searx.utils import extract_text from searx.utils import extract_text
from searx.raise_for_httperror import raise_for_httperror from searx.network import raise_for_httperror
logger = logger.getChild('Wordnik engine') logger = logger.getChild('Wordnik engine')

View File

@ -8,7 +8,7 @@ from operator import itemgetter
from datetime import datetime from datetime import datetime
from urllib.parse import quote from urllib.parse import quote
from searx.utils import extract_text, get_torrent_size from searx.utils import extract_text, get_torrent_size
from searx.poolrequests import get as http_get from searx.network import get as http_get
# about # about
about = { about = {

189
searx/network/__init__.py Normal file
View File

@ -0,0 +1,189 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import asyncio
import threading
import concurrent.futures
from time import time
import httpx
import h2.exceptions
from .network import get_network, initialize
from .client import LOOP
from .raise_for_httperror import raise_for_httperror
# queue.SimpleQueue: Support Python 3.6
try:
from queue import SimpleQueue
except ImportError:
from queue import Empty
from collections import deque
class SimpleQueue:
"""Minimal backport of queue.SimpleQueue"""
def __init__(self):
self._queue = deque()
self._count = threading.Semaphore(0)
def put(self, item):
self._queue.append(item)
self._count.release()
def get(self):
if not self._count.acquire(True):
raise Empty
return self._queue.popleft()
THREADLOCAL = threading.local()
def reset_time_for_thread():
THREADLOCAL.total_time = 0
def get_time_for_thread():
return THREADLOCAL.total_time
def set_timeout_for_thread(timeout, start_time=None):
THREADLOCAL.timeout = timeout
THREADLOCAL.start_time = start_time
def set_context_network_name(network_name):
THREADLOCAL.network = get_network(network_name)
def get_context_network():
try:
return THREADLOCAL.network
except AttributeError:
return get_network()
def request(method, url, **kwargs):
"""same as requests/requests/api.py request(...)"""
time_before_request = time()
# timeout (httpx)
if 'timeout' in kwargs:
timeout = kwargs['timeout']
else:
timeout = getattr(THREADLOCAL, 'timeout', None)
if timeout is not None:
kwargs['timeout'] = timeout
# 2 minutes timeout for the requests without timeout
timeout = timeout or 120
# ajdust actual timeout
timeout += 0.2 # overhead
start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
if start_time:
timeout -= time() - start_time
# raise_for_error
check_for_httperror = True
if 'raise_for_httperror' in kwargs:
check_for_httperror = kwargs['raise_for_httperror']
del kwargs['raise_for_httperror']
# requests compatibility
if isinstance(url, bytes):
url = url.decode()
# network
network = get_context_network()
# do request
future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), LOOP)
try:
response = future.result(timeout)
except concurrent.futures.TimeoutError as e:
raise httpx.TimeoutException('Timeout', request=None) from e
# requests compatibility
# see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
response.ok = not response.is_error
# update total_time.
# See get_time_for_thread() and reset_time_for_thread()
if hasattr(THREADLOCAL, 'total_time'):
time_after_request = time()
THREADLOCAL.total_time += time_after_request - time_before_request
# raise an exception
if check_for_httperror:
raise_for_httperror(response)
return response
def get(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
def options(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
def head(url, **kwargs):
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
def post(url, data=None, **kwargs):
return request('post', url, data=data, **kwargs)
def put(url, data=None, **kwargs):
return request('put', url, data=data, **kwargs)
def patch(url, data=None, **kwargs):
return request('patch', url, data=data, **kwargs)
def delete(url, **kwargs):
return request('delete', url, **kwargs)
async def stream_chunk_to_queue(network, q, method, url, **kwargs):
try:
async with network.stream(method, url, **kwargs) as response:
q.put(response)
async for chunk in response.aiter_bytes(65536):
if len(chunk) > 0:
q.put(chunk)
except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e:
q.put(e)
finally:
q.put(None)
def stream(method, url, **kwargs):
"""Replace httpx.stream.
Usage:
stream = poolrequests.stream(...)
response = next(stream)
for chunk in stream:
...
httpx.Client.stream requires to write the httpx.HTTPTransport version of the
the httpx.AsyncHTTPTransport declared above.
"""
q = SimpleQueue()
future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(get_network(), q, method, url, **kwargs),
LOOP)
chunk_or_exception = q.get()
while chunk_or_exception is not None:
if isinstance(chunk_or_exception, Exception):
raise chunk_or_exception
yield chunk_or_exception
chunk_or_exception = q.get()
return future.result()

214
searx/network/client.py Normal file
View File

@ -0,0 +1,214 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import asyncio
import logging
import threading
import httpcore
import httpx
from httpx_socks import AsyncProxyTransport
from python_socks import parse_proxy_url
import python_socks._errors
from searx import logger
# Optional uvloop (support Python 3.6)
try:
import uvloop
except ImportError:
pass
else:
uvloop.install()
logger = logger.getChild('searx.http.client')
LOOP = None
TRANSPORT_KWARGS = {
'backend': 'asyncio',
'trust_env': False,
}
async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
origin = httpcore._utils.url_to_origin(url)
logger.debug('Drop connections for %r', origin)
connections_to_close = connection_pool._connections_for_origin(origin)
for connection in connections_to_close:
await connection_pool._remove_from_pool(connection)
try:
await connection.aclose()
except httpcore.NetworkError as e:
logger.warning('Error closing an existing connection', exc_info=e)
class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
"""Block HTTP request"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
class AsyncProxyTransportFixed(AsyncProxyTransport):
"""Fix httpx_socks.AsyncProxyTransport
Map python_socks exceptions to httpcore.ProxyError
Map socket.gaierror to httpcore.ConnectError
Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
* self._keepalive_sweep()
* self._response_closed(self, connection)
Note: AsyncProxyTransport inherit from AsyncConnectionPool
Note: the API is going to change on httpx 0.18.0
see https://github.com/encode/httpx/pull/1522
"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except (python_socks._errors.ProxyConnectionError,
python_socks._errors.ProxyTimeoutError,
python_socks._errors.ProxyError) as e:
raise httpcore.ProxyError(e)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.NetworkError(e)
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.NetworkError, httpcore.ProtocolError) as e:
# httpcore.WriteError on HTTP/2 connection leaves a new opened stream
# then each new request creates a new stream and raise the same WriteError
await close_connections_for_url(self, url)
raise e
class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
"""Fix httpx.AsyncHTTPTransport"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.ConnectError(e)
except httpcore.CloseError as e:
# httpcore.CloseError: [Errno 104] Connection reset by peer
# raised by _keepalive_sweep()
# from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.CloseError: retry', exc_info=e)
# retry
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.ProtocolError, httpcore.NetworkError) as e:
await close_connections_for_url(self._pool, url)
raise e
def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
global LOOP, TRANSPORT_KWARGS
# support socks5h (requests compatibility):
# https://requests.readthedocs.io/en/master/user/advanced/#socks
# socks5:// hostname is resolved on client side
# socks5h:// hostname is resolved on proxy side
rdns = False
socks5h = 'socks5h://'
if proxy_url.startswith(socks5h):
proxy_url = 'socks5://' + proxy_url[len(socks5h):]
rdns = True
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
username=proxy_username, password=proxy_password,
rdns=rdns,
loop=LOOP,
verify=verify,
http2=http2,
local_address=local_address,
max_connections=limit.max_connections,
max_keepalive_connections=limit.max_keepalive_connections,
keepalive_expiry=limit.keepalive_expiry,
retries=retries,
**TRANSPORT_KWARGS)
def get_transport(verify, http2, local_address, proxy_url, limit, retries):
return AsyncHTTPTransportFixed(verify=verify,
http2=http2,
local_address=local_address,
proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
limits=limit,
retries=retries,
**TRANSPORT_KWARGS)
def iter_proxies(proxies):
# https://www.python-httpx.org/compatibility/#proxy-keys
if isinstance(proxies, str):
yield 'all://', proxies
elif isinstance(proxies, dict):
for pattern, proxy_url in proxies.items():
yield pattern, proxy_url
def new_client(enable_http, verify, enable_http2,
max_connections, max_keepalive_connections, keepalive_expiry,
proxies, local_address, retries, max_redirects):
limit = httpx.Limits(max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
keepalive_expiry=keepalive_expiry)
# See https://www.python-httpx.org/advanced/#routing
mounts = {}
for pattern, proxy_url in iter_proxies(proxies):
if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
continue
if proxy_url.startswith('socks4://') \
or proxy_url.startswith('socks5://') \
or proxy_url.startswith('socks5h://'):
mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
retries)
else:
mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
if not enable_http:
mounts['http://'] = AsyncHTTPTransportNoHttp()
transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
def init():
# log
for logger_name in ('hpack.hpack', 'hpack.table'):
logging.getLogger(logger_name).setLevel(logging.WARNING)
# loop
def loop_thread():
global LOOP
LOOP = asyncio.new_event_loop()
LOOP.run_forever()
th = threading.Thread(
target=loop_thread,
name='asyncio_loop',
daemon=True,
)
th.start()
init()

302
searx/network/network.py Normal file
View File

@ -0,0 +1,302 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
import atexit
import asyncio
import ipaddress
from itertools import cycle
import httpx
from .client import new_client, LOOP
DEFAULT_NAME = '__DEFAULT__'
NETWORKS = {}
# requests compatibility when reading proxy settings from settings.yml
PROXY_PATTERN_MAPPING = {
'http': 'http://',
'https': 'https://',
'socks4': 'socks4://',
'socks5': 'socks5://',
'socks5h': 'socks5h://',
'http:': 'http://',
'https:': 'https://',
'socks4:': 'socks4://',
'socks5:': 'socks5://',
'socks5h:': 'socks5h://',
}
ADDRESS_MAPPING = {
'ipv4': '0.0.0.0',
'ipv6': '::'
}
class Network:
__slots__ = ('enable_http', 'verify', 'enable_http2',
'max_connections', 'max_keepalive_connections', 'keepalive_expiry',
'local_addresses', 'proxies', 'max_redirects', 'retries', 'retry_on_http_error',
'_local_addresses_cycle', '_proxies_cycle', '_clients')
def __init__(self,
enable_http=True,
verify=True,
enable_http2=False,
max_connections=None,
max_keepalive_connections=None,
keepalive_expiry=None,
proxies=None,
local_addresses=None,
retries=0,
retry_on_http_error=None,
max_redirects=30):
self.enable_http = enable_http
self.verify = verify
self.enable_http2 = enable_http2
self.max_connections = max_connections
self.max_keepalive_connections = max_keepalive_connections
self.keepalive_expiry = keepalive_expiry
self.proxies = proxies
self.local_addresses = local_addresses
self.retries = retries
self.retry_on_http_error = retry_on_http_error
self.max_redirects = max_redirects
self._local_addresses_cycle = self.get_ipaddress_cycle()
self._proxies_cycle = self.get_proxy_cycles()
self._clients = {}
self.check_parameters()
def check_parameters(self):
for address in self.iter_ipaddresses():
if '/' in address:
ipaddress.ip_network(address, False)
else:
ipaddress.ip_address(address)
if self.proxies is not None and not isinstance(self.proxies, (str, dict)):
raise ValueError('proxies type has to be str, dict or None')
def iter_ipaddresses(self):
local_addresses = self.local_addresses
if not local_addresses:
return
elif isinstance(local_addresses, str):
local_addresses = [local_addresses]
for address in local_addresses:
yield address
def get_ipaddress_cycle(self):
while True:
count = 0
for address in self.iter_ipaddresses():
if '/' in address:
for a in ipaddress.ip_network(address, False).hosts():
yield str(a)
count += 1
else:
a = ipaddress.ip_address(address)
yield str(a)
count += 1
if count == 0:
yield None
def iter_proxies(self):
if not self.proxies:
return
# https://www.python-httpx.org/compatibility/#proxy-keys
if isinstance(self.proxies, str):
yield 'all://', [self.proxies]
else:
for pattern, proxy_url in self.proxies.items():
pattern = PROXY_PATTERN_MAPPING.get(pattern, pattern)
if isinstance(proxy_url, str):
proxy_url = [proxy_url]
yield pattern, proxy_url
def get_proxy_cycles(self):
proxy_settings = {}
for pattern, proxy_urls in self.iter_proxies():
proxy_settings[pattern] = cycle(proxy_urls)
while True:
yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
def get_client(self, verify=None, max_redirects=None):
verify = self.verify if verify is None else verify
max_redirects = self.max_redirects if max_redirects is None else max_redirects
local_address = next(self._local_addresses_cycle)
proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
key = (verify, max_redirects, local_address, proxies)
if key not in self._clients or self._clients[key].is_closed:
self._clients[key] = new_client(self.enable_http,
verify,
self.enable_http2,
self.max_connections,
self.max_keepalive_connections,
self.keepalive_expiry,
dict(proxies),
local_address,
0,
max_redirects)
return self._clients[key]
async def aclose(self):
async def close_client(client):
try:
await client.aclose()
except httpx.HTTPError:
pass
await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False)
@staticmethod
def get_kwargs_clients(kwargs):
kwargs_clients = {}
if 'verify' in kwargs:
kwargs_clients['verify'] = kwargs.pop('verify')
if 'max_redirects' in kwargs:
kwargs_clients['max_redirects'] = kwargs.pop('max_redirects')
return kwargs_clients
def is_valid_respones(self, response):
if (self.retry_on_http_error is True and 400 <= response.status_code <= 599) \
or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error) \
or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error):
return False
return True
async def request(self, method, url, **kwargs):
retries = self.retries
while retries >= 0: # pragma: no cover
kwargs_clients = Network.get_kwargs_clients(kwargs)
client = self.get_client(**kwargs_clients)
try:
response = await client.request(method, url, **kwargs)
if self.is_valid_respones(response) or retries <= 0:
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
def stream(self, method, url, **kwargs):
retries = self.retries
while retries >= 0: # pragma: no cover
kwargs_clients = Network.get_kwargs_clients(kwargs)
client = self.get_client(**kwargs_clients)
try:
response = client.stream(method, url, **kwargs)
if self.is_valid_respones(response) or retries <= 0:
return response
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retries <= 0:
raise e
retries -= 1
@classmethod
async def aclose_all(cls):
await asyncio.gather(*[network.aclose() for network in NETWORKS.values()], return_exceptions=False)
def get_network(name=None):
global NETWORKS
return NETWORKS[name or DEFAULT_NAME]
def initialize(settings_engines=None, settings_outgoing=None):
from searx.engines import engines
from searx import settings
global NETWORKS
settings_engines = settings_engines or settings.get('engines')
settings_outgoing = settings_outgoing or settings.get('outgoing')
# default parameters for AsyncHTTPTransport
# see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # noqa
default_params = {
'enable_http': False,
'verify': True,
'enable_http2': settings_outgoing.get('enable_http2', True),
# Magic number kept from previous code
'max_connections': settings_outgoing.get('pool_connections', 100),
# Picked from constructor
'max_keepalive_connections': settings_outgoing.get('pool_maxsize', 10),
#
'keepalive_expiry': settings_outgoing.get('keepalive_expiry', 5.0),
'local_addresses': settings_outgoing.get('source_ips'),
'proxies': settings_outgoing.get('proxies'),
# default maximum redirect
# from https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
'max_redirects': settings_outgoing.get('max_redirects', 30),
#
'retries': settings_outgoing.get('retries', 0),
'retry_on_http_error': None,
}
def new_network(params):
nonlocal default_params
result = {}
result.update(default_params)
result.update(params)
return Network(**result)
def iter_networks():
nonlocal settings_engines
for engine_spec in settings_engines:
engine_name = engine_spec['name']
engine = engines.get(engine_name)
if engine is None:
continue
network = getattr(engine, 'network', None)
yield engine_name, engine, network
if NETWORKS:
done()
NETWORKS.clear()
NETWORKS[DEFAULT_NAME] = new_network({})
NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'})
NETWORKS['ipv6'] = new_network({'local_addresses': '::'})
# define networks from outgoing.networks
for network_name, network in settings_outgoing.get('networks', {}).items():
NETWORKS[network_name] = new_network(network)
# define networks from engines.[i].network (except references)
for engine_name, engine, network in iter_networks():
if network is None:
network = {}
for attribute_name, attribute_value in default_params.items():
if hasattr(engine, attribute_name):
network[attribute_name] = getattr(engine, attribute_name)
else:
network[attribute_name] = attribute_value
NETWORKS[engine_name] = new_network(network)
elif isinstance(network, dict):
NETWORKS[engine_name] = new_network(network)
# define networks from engines.[i].network (references)
for engine_name, engine, network in iter_networks():
if isinstance(network, str):
NETWORKS[engine_name] = NETWORKS[network]
@atexit.register
def done():
"""Close all HTTP client
Avoid a warning at exit
see https://github.com/encode/httpx/blob/1a6e254f72d9fd5694a1c10a28927e193ab4f76b/httpx/_client.py#L1785
Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__
So Network.aclose is called here using atexit.register
"""
try:
if LOOP:
future = asyncio.run_coroutine_threadsafe(Network.aclose_all(), LOOP)
# wait 3 seconds to close the HTTP clients
future.result(3)
finally:
NETWORKS.clear()
NETWORKS[DEFAULT_NAME] = Network()

View File

@ -1,550 +0,0 @@
import atexit
import sys
import threading
import asyncio
import logging
import concurrent.futures
from time import time
from itertools import cycle
import httpcore
import httpx
import h2.exceptions
from httpx_socks import AsyncProxyTransport
from python_socks import parse_proxy_url
import python_socks._errors
from searx import settings
from searx import logger
from searx.raise_for_httperror import raise_for_httperror
# Optional uvloop (support Python 3.6)
try:
import uvloop
except ImportError:
pass
else:
uvloop.install()
# queue.SimpleQueue: Support Python 3.6
try:
from queue import SimpleQueue
except ImportError:
from queue import Empty
from collections import deque
class SimpleQueue:
"""Minimal backport of queue.SimpleQueue"""
def __init__(self):
self._queue = deque()
self._count = threading.Semaphore(0)
def put(self, item):
self._queue.append(item)
self._count.release()
def get(self, timeout=None):
if not self._count.acquire(True, timeout):
raise Empty
return self._queue.popleft()
logger = logger.getChild('poolrequests')
try:
import ssl
if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
# https://github.com/certifi/python-certifi#1024-bit-root-certificates
logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
.format(ssl.OPENSSL_VERSION))
sys.exit(1)
except ImportError:
ssl = None
if not getattr(ssl, "HAS_SNI", False):
try:
import OpenSSL # pylint: disable=unused-import
except ImportError:
logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
"Some HTTPS connections will fail")
sys.exit(1)
LOOP = None
CLIENTS = dict()
THREADLOCAL = threading.local()
LIMITS = httpx.Limits(
# Magic number kept from previous code
max_connections=settings['outgoing'].get('pool_connections', 100),
# Picked from constructor
max_keepalive_connections=settings['outgoing'].get('pool_maxsize', 10),
#
keepalive_expiry=settings['outgoing'].get('keepalive_expiry', 5.0)
)
# default parameters for AsyncHTTPTransport
# see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # noqa
TRANSPORT_KWARGS = {
'http2': settings['outgoing'].get('http2', False),
'retries': 0,
'trust_env': False,
'backend': 'asyncio'
}
# requests compatibility when reading proxy settings from settings.yml
PROXY_PATTERN_MAPPING = {
'http': 'https://',
'https:': 'https://'
}
# default maximum redirect
# from https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
DEFAULT_REDIRECT_LIMIT = 30
if settings['outgoing'].get('source_ips'):
LOCAL_ADDRESS_CYCLE = cycle(settings['outgoing'].get('source_ips'))
else:
LOCAL_ADDRESS_CYCLE = cycle((None, ))
def set_timeout_for_thread(timeout, start_time=None):
THREADLOCAL.timeout = timeout
THREADLOCAL.start_time = start_time
def set_enable_http_protocol(enable_http):
THREADLOCAL.enable_http = enable_http
def get_enable_http_protocol():
try:
return THREADLOCAL.enable_http
except AttributeError:
return False
def reset_time_for_thread():
THREADLOCAL.total_time = 0
def get_time_for_thread():
return THREADLOCAL.total_time
def get_proxy_cycles(proxy_settings):
if not proxy_settings:
return None
# Backwards compatibility for single proxy in settings.yml
for protocol, proxy in proxy_settings.items():
if isinstance(proxy, str):
proxy_settings[protocol] = [proxy]
for protocol in proxy_settings:
proxy_settings[protocol] = cycle(proxy_settings[protocol])
return proxy_settings
GLOBAL_PROXY_CYCLES = get_proxy_cycles(settings['outgoing'].get('proxies'))
def get_proxies(proxy_cycles):
if proxy_cycles:
return {protocol: next(proxy_cycle) for protocol, proxy_cycle in proxy_cycles.items()}
return None
def get_global_proxies():
return get_proxies(GLOBAL_PROXY_CYCLES)
async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
origin = httpcore._utils.url_to_origin(url)
logger.debug('Drop connections for %r', origin)
connections_to_close = connection_pool._connections_for_origin(origin)
for connection in connections_to_close:
await connection_pool._remove_from_pool(connection)
try:
await connection.aclose()
except httpcore.NetworkError as e:
logger.warning('Error closing an existing connection', exc_info=e)
class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
"""Block HTTP request"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
class AsyncProxyTransportFixed(AsyncProxyTransport):
"""Fix httpx_socks.AsyncProxyTransport
Map python_socks exceptions to httpcore.ProxyError
Map socket.gaierror to httpcore.ConnectError
Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
* self._keepalive_sweep()
* self._response_closed(self, connection)
Note: AsyncProxyTransport inherit from AsyncConnectionPool
Note: the API is going to change on httpx 0.18.0
see https://github.com/encode/httpx/pull/1522
"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except (python_socks._errors.ProxyConnectionError,
python_socks._errors.ProxyTimeoutError,
python_socks._errors.ProxyError) as e:
raise httpcore.ProxyError(e)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.NetworkError(e)
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.NetworkError, httpcore.ProtocolError) as e:
# httpcore.WriteError on HTTP/2 connection leaves a new opened stream
# then each new request creates a new stream and raise the same WriteError
await close_connections_for_url(self, url)
raise e
class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
"""Fix httpx.AsyncHTTPTransport"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.ConnectError(e)
except httpcore.CloseError as e:
# httpcore.CloseError: [Errno 104] Connection reset by peer
# raised by _keepalive_sweep()
# from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.CloseError: retry', exc_info=e)
# retry
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.ProtocolError, httpcore.NetworkError) as e:
await close_connections_for_url(self._pool, url)
raise e
def get_transport_for_socks_proxy(verify, local_address, proxy_url):
global LOOP, LIMITS, TRANSPORT_KWARGS
# support socks5h (requests compatibility):
# https://requests.readthedocs.io/en/master/user/advanced/#socks
# socks5:// hostname is resolved on client side
# socks5h:// hostname is resolved on proxy side
rdns = False
socks5h = 'socks5h://'
if proxy_url.startswith(socks5h):
proxy_url = 'socks5://' + proxy_url[len(socks5h):]
rdns = True
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
username=proxy_username, password=proxy_password,
rdns=rdns,
loop=LOOP,
verify=verify,
local_address=local_address,
max_connections=LIMITS.max_connections,
max_keepalive_connections=LIMITS.max_keepalive_connections,
keepalive_expiry=LIMITS.keepalive_expiry,
**TRANSPORT_KWARGS)
def get_transport(verify, local_address, proxy_url):
global LIMITS
return AsyncHTTPTransportFixed(verify=verify,
local_address=local_address,
limits=LIMITS,
proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
**TRANSPORT_KWARGS)
def iter_proxies(proxies):
# https://www.python-httpx.org/compatibility/#proxy-keys
if isinstance(proxies, str):
yield 'all://', proxies
elif isinstance(proxies, dict):
for pattern, proxy_url in proxies.items():
pattern = PROXY_PATTERN_MAPPING.get(pattern, pattern)
yield pattern, proxy_url
def new_client(verify, local_address, proxies, max_redirects, enable_http):
# See https://www.python-httpx.org/advanced/#routing
mounts = {}
for pattern, proxy_url in iter_proxies(proxies):
if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
continue
if proxy_url.startswith('socks4://') \
or proxy_url.startswith('socks5://') \
or proxy_url.startswith('socks5h://'):
mounts[pattern] = get_transport_for_socks_proxy(verify, local_address, proxy_url)
else:
mounts[pattern] = get_transport(verify, local_address, proxy_url)
if not enable_http:
mounts['http://'] = AsyncHTTPTransportNoHttp()
transport = get_transport(verify, local_address, None)
return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
def get_client(verify, local_address, proxies, max_redirects, allow_http):
global CLIENTS
key = (verify, local_address, repr(proxies), max_redirects, allow_http)
if key not in CLIENTS:
CLIENTS[key] = new_client(verify, local_address, proxies, max_redirects, allow_http)
return CLIENTS[key]
async def send_request(method, url, enable_http, kwargs):
if isinstance(url, bytes):
url = url.decode()
verify = kwargs.pop('verify', True)
local_address = next(LOCAL_ADDRESS_CYCLE)
proxies = kwargs.pop('proxies', None) or get_global_proxies()
max_redirects = kwargs.pop('max_redirects', DEFAULT_REDIRECT_LIMIT)
client = get_client(verify, local_address, proxies, max_redirects, enable_http)
response = await client.request(method.upper(), url, **kwargs)
# requests compatibility
# see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
response.ok = not response.is_error
return response
def request(method, url, **kwargs):
"""same as requests/requests/api.py request(...)"""
time_before_request = time()
# timeout
if 'timeout' in kwargs:
timeout = kwargs['timeout']
else:
timeout = getattr(THREADLOCAL, 'timeout', None)
if timeout is not None:
kwargs['timeout'] = timeout
# raise_for_error
check_for_httperror = True
if 'raise_for_httperror' in kwargs:
check_for_httperror = kwargs['raise_for_httperror']
del kwargs['raise_for_httperror']
# do request
future = asyncio.run_coroutine_threadsafe(send_request(method, url, get_enable_http_protocol(), kwargs), LOOP)
try:
if timeout:
timeout += 0.2 # overhead
start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
if start_time:
timeout -= time() - start_time
response = future.result(timeout or 120)
except concurrent.futures.TimeoutError as e:
raise httpx.TimeoutException('Timeout', request=None) from e
# update total_time.
# See get_time_for_thread() and reset_time_for_thread()
if hasattr(THREADLOCAL, 'total_time'):
time_after_request = time()
THREADLOCAL.total_time += time_after_request - time_before_request
# raise an exception
if check_for_httperror:
raise_for_httperror(response)
return response
async def stream_chunk_to_queue(method, url, q, **kwargs):
verify = kwargs.pop('verify', True)
local_address = next(LOCAL_ADDRESS_CYCLE)
proxies = kwargs.pop('proxies', None) or get_global_proxies()
# "30" from requests:
# https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
max_redirects = kwargs.pop('max_redirects', 30)
client = get_client(verify, local_address, proxies, max_redirects, True)
try:
async with client.stream(method, url, **kwargs) as response:
q.put(response)
async for chunk in response.aiter_bytes(65536):
if len(chunk) > 0:
q.put(chunk)
except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e:
q.put(e)
finally:
q.put(None)
def stream(method, url, **kwargs):
"""Replace httpx.stream.
Usage:
stream = poolrequests.stream(...)
response = next(stream)
for chunk in stream:
...
httpx.Client.stream requires to write the httpx.HTTPTransport version of the
the httpx.AsyncHTTPTransport declared above.
"""
q = SimpleQueue()
future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(method, url, q, **kwargs), LOOP)
chunk_or_exception = q.get(timeout=60)
while chunk_or_exception is not None:
if isinstance(chunk_or_exception, Exception):
raise chunk_or_exception
yield chunk_or_exception
chunk_or_exception = q.get(timeout=60)
return future.result()
def get(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs)
def options(url, **kwargs):
kwargs.setdefault('allow_redirects', True)
return request('options', url, **kwargs)
def head(url, **kwargs):
kwargs.setdefault('allow_redirects', False)
return request('head', url, **kwargs)
def post(url, data=None, **kwargs):
return request('post', url, data=data, **kwargs)
def put(url, data=None, **kwargs):
return request('put', url, data=data, **kwargs)
def patch(url, data=None, **kwargs):
return request('patch', url, data=data, **kwargs)
def delete(url, **kwargs):
return request('delete', url, **kwargs)
def init():
# log
for logger_name in ('hpack.hpack', 'hpack.table'):
logging.getLogger(logger_name).setLevel(logging.WARNING)
# loop
def loop_thread():
global LOOP
LOOP = asyncio.new_event_loop()
LOOP.run_forever()
th = threading.Thread(
target=loop_thread,
name='asyncio_loop',
daemon=True,
)
th.start()
@atexit.register
def done():
"""Close all HTTP client
Avoid a warning at exit
see https://github.com/encode/httpx/blob/1a6e254f72d9fd5694a1c10a28927e193ab4f76b/httpx/_client.py#L1785
"""
global LOOP
async def close_client(client):
try:
await client.aclose()
except httpx.HTTPError:
pass
async def close_clients():
await asyncio.gather(*[close_client(client) for client in CLIENTS.values()], return_exceptions=False)
future = asyncio.run_coroutine_threadsafe(close_clients(), LOOP)
# wait 3 seconds to close the HTTP clients
future.result(3)
init()
# ## TEMPORARY DEBUG ##
def debug_connection(connection):
now = LOOP.time()
expired = (connection.state == httpcore._async.base.ConnectionState.IDLE
and connection.expires_at is not None
and now >= connection.expires_at)
return connection.info()\
+ (', connect_failed' if connection.connect_failed else '')\
+ (', expired' if expired else '')
def debug_origin(origin):
return origin[0].decode() + '://' + origin[1].decode() + ':' + str(origin[2])
def debug_transport(transport):
result = {
'__class__': str(transport.__class__.__name__)
}
if isinstance(transport, (httpx.AsyncHTTPTransport, AsyncHTTPTransportFixed)):
pool = transport._pool
result['__pool_class__'] = str(pool.__class__.__name__)
if isinstance(pool, httpcore.AsyncConnectionPool):
for origin, connections in pool._connections.items():
result[debug_origin(origin)] = [debug_connection(connection) for connection in connections]
return result
elif isinstance(transport, AsyncProxyTransportFixed):
for origin, connections in transport._connections.items():
result[debug_origin(origin)] = [debug_connection(connection) for connection in connections]
return result
return result
def debug_asyncclient(client, key=None):
result = {}
if key:
result['__key__'] = [k if isinstance(k, (str, int, float, bool, type(None))) else repr(k) for k in key]
result['__default__'] = debug_transport(client._transport)
for urlpattern, transport in client._mounts.items():
result[urlpattern.pattern] = debug_transport(transport)
return result
def debug_asyncclients():
global CLIENTS
return [debug_asyncclient(client, key) for key, client in CLIENTS.items()]

View File

@ -13,7 +13,7 @@ from langdetect import detect_langs
from langdetect.lang_detect_exception import LangDetectException from langdetect.lang_detect_exception import LangDetectException
import httpx import httpx
from searx import poolrequests, logger from searx import network, logger
from searx.results import ResultContainer from searx.results import ResultContainer
from searx.search.models import SearchQuery, EngineRef from searx.search.models import SearchQuery, EngineRef
from searx.search.processors import EngineProcessor from searx.search.processors import EngineProcessor
@ -75,8 +75,8 @@ def _is_url_image(image_url):
while retry > 0: while retry > 0:
a = time() a = time()
try: try:
poolrequests.set_timeout_for_thread(10.0, time()) network.set_timeout_for_thread(10.0, time())
r = poolrequests.get(image_url, timeout=10.0, allow_redirects=True, headers={ r = network.get(image_url, timeout=10.0, allow_redirects=True, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US;q=0.5,en;q=0.3', 'Accept-Language': 'en-US;q=0.5,en;q=0.3',

View File

@ -6,7 +6,7 @@ import asyncio
import httpx import httpx
import searx.poolrequests as poolrequests import searx.network
from searx.engines import settings from searx.engines import settings
from searx import logger from searx import logger
from searx.utils import gen_useragent from searx.utils import gen_useragent
@ -64,10 +64,6 @@ class OnlineProcessor(EngineProcessor):
auth=params['auth'] auth=params['auth']
) )
# setting engine based proxies
if hasattr(self.engine, 'proxies'):
request_args['proxies'] = poolrequests.get_proxies(self.engine.proxies)
# max_redirects # max_redirects
max_redirects = params.get('max_redirects') max_redirects = params.get('max_redirects')
if max_redirects: if max_redirects:
@ -85,9 +81,9 @@ class OnlineProcessor(EngineProcessor):
# specific type of request (GET or POST) # specific type of request (GET or POST)
if params['method'] == 'GET': if params['method'] == 'GET':
req = poolrequests.get req = searx.network.get
else: else:
req = poolrequests.post req = searx.network.post
request_args['data'] = params['data'] request_args['data'] = params['data']
@ -128,11 +124,11 @@ class OnlineProcessor(EngineProcessor):
def search(self, query, params, result_container, start_time, timeout_limit): def search(self, query, params, result_container, start_time, timeout_limit):
# set timeout for all HTTP requests # set timeout for all HTTP requests
poolrequests.set_timeout_for_thread(timeout_limit, start_time=start_time) searx.network.set_timeout_for_thread(timeout_limit, start_time=start_time)
# reset the HTTP total time # reset the HTTP total time
poolrequests.reset_time_for_thread() searx.network.reset_time_for_thread()
# enable HTTP only if explicitly enabled # set the network
poolrequests.set_enable_http_protocol(self.engine.enable_http) searx.network.set_context_network_name(self.engine_name)
# suppose everything will be alright # suppose everything will be alright
http_exception = False http_exception = False
@ -149,7 +145,7 @@ class OnlineProcessor(EngineProcessor):
# update engine time when there is no exception # update engine time when there is no exception
engine_time = time() - start_time engine_time = time() - start_time
page_load_time = poolrequests.get_time_for_thread() page_load_time = searx.network.get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time) result_container.add_timing(self.engine_name, engine_time, page_load_time)
with threading.RLock(): with threading.RLock():
self.engine.stats['engine_time'] += engine_time self.engine.stats['engine_time'] += engine_time
@ -162,7 +158,7 @@ class OnlineProcessor(EngineProcessor):
# Timing # Timing
engine_time = time() - start_time engine_time = time() - start_time
page_load_time = poolrequests.get_time_for_thread() page_load_time = searx.network.get_time_for_thread()
result_container.add_timing(self.engine_name, engine_time, page_load_time) result_container.add_timing(self.engine_name, engine_time, page_load_time)
# Record the errors # Record the errors

View File

@ -72,16 +72,12 @@ outgoing: # communication with search engines
useragent_suffix : "" # suffix of searx_useragent, could contain informations like an email address to the administrator useragent_suffix : "" # suffix of searx_useragent, could contain informations like an email address to the administrator
pool_connections : 100 # The maximum number of concurrent connections that may be established. pool_connections : 100 # The maximum number of concurrent connections that may be established.
pool_maxsize : 20 # Allow the connection pool to maintain keep-alive connections below this point. pool_maxsize : 20 # Allow the connection pool to maintain keep-alive connections below this point.
keepalive_expiry: 30.0 # Number of seconds to keep a connection in the pool enable_http2: True # See https://www.python-httpx.org/http2/
http2: True # Enable HTTP/2 (experimental)
# uncomment below section if you want to use a proxy # uncomment below section if you want to use a proxy
# see https://2.python-requests.org/en/latest/user/advanced/#proxies # see https://2.python-requests.org/en/latest/user/advanced/#proxies
# SOCKS proxies are also supported: see https://2.python-requests.org/en/latest/user/advanced/#socks # SOCKS proxies are also supported: see https://2.python-requests.org/en/latest/user/advanced/#socks
# proxies: # proxies:
# http: # all://:
# - http://proxy1:8080
# - http://proxy2:8080
# https:
# - http://proxy1:8080 # - http://proxy1:8080
# - http://proxy2:8080 # - http://proxy2:8080
# using_tor_proxy : True # using_tor_proxy : True
@ -91,6 +87,7 @@ outgoing: # communication with search engines
# source_ips: # source_ips:
# - 1.1.1.1 # - 1.1.1.1
# - 1.1.1.2 # - 1.1.1.2
# - fe80::/126
# External plugin configuration # External plugin configuration
# See http://searx.github.io/searx/dev/plugins.html for more details # See http://searx.github.io/searx/dev/plugins.html for more details
@ -889,11 +886,13 @@ engines:
engine : qwant engine : qwant
shortcut : qwi shortcut : qwi
categories : images categories : images
network: qwant
- name : qwant news - name : qwant news
engine : qwant engine : qwant
shortcut : qwn shortcut : qwn
categories : news categories : news
network: qwant
# - name: library # - name: library
# engine: recoll # engine: recoll

View File

@ -10,8 +10,8 @@ import traceback
from os.path import dirname, join, abspath, realpath from os.path import dirname, join, abspath, realpath
from unittest import TestCase
from splinter import Browser from splinter import Browser
import aiounittest
class SearxTestLayer: class SearxTestLayer:
@ -82,7 +82,7 @@ def run_robot_tests(tests):
test(browser) test(browser)
class SearxTestCase(TestCase): class SearxTestCase(aiounittest.AsyncTestCase):
"""Base test case for non-robot tests.""" """Base test case for non-robot tests."""
layer = SearxTestLayer layer = SearxTestLayer

View File

@ -93,7 +93,7 @@ from searx.plugins import plugins
from searx.plugins.oa_doi_rewrite import get_doi_resolver from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES
from searx.answerers import answerers from searx.answerers import answerers
from searx import poolrequests from searx.network import stream as http_stream
from searx.answerers import ask from searx.answerers import ask
from searx.metrology.error_recorder import errors_per_engines from searx.metrology.error_recorder import errors_per_engines
@ -928,7 +928,7 @@ def image_proxy():
try: try:
headers = dict_subset(request.headers, {'If-Modified-Since', 'If-None-Match'}) headers = dict_subset(request.headers, {'If-Modified-Since', 'If-None-Match'})
headers['User-Agent'] = gen_useragent() headers['User-Agent'] = gen_useragent()
stream = poolrequests.stream( stream = http_stream(
method='GET', method='GET',
url=url, url=url,
headers=headers, headers=headers,
@ -1118,11 +1118,6 @@ def config():
}) })
@app.route('/config/http')
def config_http():
return jsonify(poolrequests.debug_asyncclients())
@app.errorhandler(404) @app.errorhandler(404)
def page_not_found(e): def page_not_found(e):
return render('404.html'), 404 return render('404.html'), 404

View File

@ -10,7 +10,7 @@ from searx.engines.wikidata import send_wikidata_query
from searx.utils import extract_text from searx.utils import extract_text
import searx import searx
import searx.search import searx.search
import searx.poolrequests import searx.network
SPARQL_WIKIPEDIA_ARTICLE = """ SPARQL_WIKIPEDIA_ARTICLE = """
SELECT DISTINCT ?item ?name SELECT DISTINCT ?item ?name
@ -59,7 +59,7 @@ def get_wikipedia_summary(language, pageid):
search_url = 'https://{language}.wikipedia.org/api/rest_v1/page/summary/{title}' search_url = 'https://{language}.wikipedia.org/api/rest_v1/page/summary/{title}'
url = search_url.format(title=quote(pageid), language=language) url = search_url.format(title=quote(pageid), language=language)
try: try:
response = searx.poolrequests.get(url) response = searx.network.get(url)
response.raise_for_status() response.raise_for_status()
api_result = json.loads(response.text) api_result = json.loads(response.text)
return api_result.get('extract') return api_result.get('extract')
@ -89,7 +89,7 @@ def get_website_description(url, lang1, lang2=None):
lang_list.append(lang2) lang_list.append(lang2)
headers['Accept-Language'] = f'{",".join(lang_list)};q=0.8' headers['Accept-Language'] = f'{",".join(lang_list)};q=0.8'
try: try:
response = searx.poolrequests.get(url, headers=headers, timeout=10) response = searx.network.get(url, headers=headers, timeout=10)
response.raise_for_status() response.raise_for_status()
except Exception: except Exception:
return (None, None) return (None, None)

View File

View File

@ -0,0 +1,236 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
from mock import patch
import httpx
from searx.network.network import Network, NETWORKS
from searx.testing import SearxTestCase
class TestNetwork(SearxTestCase):
def test_simple(self):
network = Network()
self.assertEqual(next(network._local_addresses_cycle), None)
self.assertEqual(next(network._proxies_cycle), ())
def test_ipaddress_cycle(self):
network = NETWORKS['ipv6']
self.assertEqual(next(network._local_addresses_cycle), '::')
self.assertEqual(next(network._local_addresses_cycle), '::')
network = NETWORKS['ipv4']
self.assertEqual(next(network._local_addresses_cycle), '0.0.0.0')
self.assertEqual(next(network._local_addresses_cycle), '0.0.0.0')
network = Network(local_addresses=['192.168.0.1', '192.168.0.2'])
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
network = Network(local_addresses=['192.168.0.0/30'])
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.1')
self.assertEqual(next(network._local_addresses_cycle), '192.168.0.2')
network = Network(local_addresses=['fe80::/10'])
self.assertEqual(next(network._local_addresses_cycle), 'fe80::1')
self.assertEqual(next(network._local_addresses_cycle), 'fe80::2')
self.assertEqual(next(network._local_addresses_cycle), 'fe80::3')
with self.assertRaises(ValueError):
Network(local_addresses=['not_an_ip_address'])
def test_proxy_cycles(self):
network = Network(proxies='http://localhost:1337')
self.assertEqual(next(network._proxies_cycle), (('all://', 'http://localhost:1337'),))
network = Network(proxies={
'https': 'http://localhost:1337',
'http': 'http://localhost:1338'
})
self.assertEqual(next(network._proxies_cycle),
(('https://', 'http://localhost:1337'), ('http://', 'http://localhost:1338')))
self.assertEqual(next(network._proxies_cycle),
(('https://', 'http://localhost:1337'), ('http://', 'http://localhost:1338')))
network = Network(proxies={
'https': ['http://localhost:1337', 'http://localhost:1339'],
'http': 'http://localhost:1338'
})
self.assertEqual(next(network._proxies_cycle),
(('https://', 'http://localhost:1337'), ('http://', 'http://localhost:1338')))
self.assertEqual(next(network._proxies_cycle),
(('https://', 'http://localhost:1339'), ('http://', 'http://localhost:1338')))
with self.assertRaises(ValueError):
Network(proxies=1)
def test_get_kwargs_clients(self):
kwargs = {
'verify': True,
'max_redirects': 5,
'timeout': 2,
}
kwargs_client = Network.get_kwargs_clients(kwargs)
self.assertEqual(len(kwargs_client), 2)
self.assertEqual(len(kwargs), 1)
self.assertEqual(kwargs['timeout'], 2)
self.assertTrue(kwargs_client['verify'])
self.assertEqual(kwargs_client['max_redirects'], 5)
async def test_get_client(self):
network = Network(verify=True)
client1 = network.get_client()
client2 = network.get_client(verify=True)
client3 = network.get_client(max_redirects=10)
client4 = network.get_client(verify=True)
client5 = network.get_client(verify=False)
client6 = network.get_client(max_redirects=10)
self.assertEqual(client1, client2)
self.assertEqual(client1, client4)
self.assertNotEqual(client1, client3)
self.assertNotEqual(client1, client5)
self.assertEqual(client3, client6)
await network.aclose()
async def test_aclose(self):
network = Network(verify=True)
network.get_client()
await network.aclose()
async def test_request(self):
a_text = 'Lorem Ipsum'
response = httpx.Response(status_code=200, text=a_text)
with patch.object(httpx.AsyncClient, 'request', return_value=response):
network = Network(enable_http=True)
response = await network.request('GET', 'https://example.com/')
self.assertEqual(response.text, a_text)
await network.aclose()
class TestNetworkRequestRetries(SearxTestCase):
TEXT = 'Lorem Ipsum'
@classmethod
def get_response_404_then_200(cls):
first = True
async def get_response(*args, **kwargs):
nonlocal first
if first:
first = False
return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT)
return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT)
return get_response
async def test_retries_ok(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=1, retry_on_http_error=403)
response = await network.request('GET', 'https://example.com/')
self.assertEqual(response.text, TestNetworkRequestRetries.TEXT)
await network.aclose()
async def test_retries_fail_int(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
response = await network.request('GET', 'https://example.com/')
self.assertEqual(response.status_code, 403)
await network.aclose()
async def test_retries_fail_list(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=[403, 429])
response = await network.request('GET', 'https://example.com/')
self.assertEqual(response.status_code, 403)
await network.aclose()
async def test_retries_fail_bool(self):
with patch.object(httpx.AsyncClient, 'request', new=TestNetworkRequestRetries.get_response_404_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=True)
response = await network.request('GET', 'https://example.com/')
self.assertEqual(response.status_code, 403)
await network.aclose()
async def test_retries_exception_then_200(self):
request_count = 0
async def get_response(*args, **kwargs):
nonlocal request_count
request_count += 1
if request_count < 3:
raise httpx.RequestError('fake exception', request=None)
return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT)
with patch.object(httpx.AsyncClient, 'request', new=get_response):
network = Network(enable_http=True, retries=2)
response = await network.request('GET', 'https://example.com/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.text, TestNetworkRequestRetries.TEXT)
await network.aclose()
async def test_retries_exception(self):
async def get_response(*args, **kwargs):
raise httpx.RequestError('fake exception', request=None)
with patch.object(httpx.AsyncClient, 'request', new=get_response):
network = Network(enable_http=True, retries=0)
with self.assertRaises(httpx.RequestError):
await network.request('GET', 'https://example.com/')
await network.aclose()
class TestNetworkStreamRetries(SearxTestCase):
TEXT = 'Lorem Ipsum'
@classmethod
def get_response_exception_then_200(cls):
first = True
def stream(*args, **kwargs):
nonlocal first
if first:
first = False
raise httpx.RequestError('fake exception', request=None)
return httpx.Response(status_code=200, text=TestNetworkStreamRetries.TEXT)
return stream
async def test_retries_ok(self):
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
network = Network(enable_http=True, retries=1, retry_on_http_error=403)
response = network.stream('GET', 'https://example.com/')
self.assertEqual(response.text, TestNetworkStreamRetries.TEXT)
await network.aclose()
async def test_retries_fail(self):
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
with self.assertRaises(httpx.RequestError):
network.stream('GET', 'https://example.com/')
await network.aclose()
async def test_retries_exception(self):
first = True
def stream(*args, **kwargs):
nonlocal first
if first:
first = False
return httpx.Response(status_code=403, text=TestNetworkRequestRetries.TEXT)
return httpx.Response(status_code=200, text=TestNetworkRequestRetries.TEXT)
with patch.object(httpx.AsyncClient, 'stream', new=stream):
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
response = network.stream('GET', 'https://example.com/')
self.assertEqual(response.status_code, 403)
await network.aclose()

View File

@ -1,62 +0,0 @@
from searx.testing import SearxTestCase
from searx.poolrequests import get_proxy_cycles, get_proxies
CONFIG = {'http': ['http://localhost:9090', 'http://localhost:9092'],
'https': ['http://localhost:9091', 'http://localhost:9093']}
class TestProxy(SearxTestCase):
def test_noconfig(self):
cycles = get_proxy_cycles(None)
self.assertIsNone(cycles)
cycles = get_proxy_cycles(False)
self.assertIsNone(cycles)
def test_oldconfig(self):
config = {
'http': 'http://localhost:9090',
'https': 'http://localhost:9091',
}
cycles = get_proxy_cycles(config)
self.assertEqual(next(cycles['http']), 'http://localhost:9090')
self.assertEqual(next(cycles['http']), 'http://localhost:9090')
self.assertEqual(next(cycles['https']), 'http://localhost:9091')
self.assertEqual(next(cycles['https']), 'http://localhost:9091')
def test_one_proxy(self):
config = {
'http': ['http://localhost:9090'],
'https': ['http://localhost:9091'],
}
cycles = get_proxy_cycles(config)
self.assertEqual(next(cycles['http']), 'http://localhost:9090')
self.assertEqual(next(cycles['http']), 'http://localhost:9090')
self.assertEqual(next(cycles['https']), 'http://localhost:9091')
self.assertEqual(next(cycles['https']), 'http://localhost:9091')
def test_multiple_proxies(self):
cycles = get_proxy_cycles(CONFIG)
self.assertEqual(next(cycles['http']), 'http://localhost:9090')
self.assertEqual(next(cycles['http']), 'http://localhost:9092')
self.assertEqual(next(cycles['http']), 'http://localhost:9090')
self.assertEqual(next(cycles['https']), 'http://localhost:9091')
self.assertEqual(next(cycles['https']), 'http://localhost:9093')
self.assertEqual(next(cycles['https']), 'http://localhost:9091')
def test_getproxies_none(self):
self.assertIsNone(get_proxies(None))
def test_getproxies_config(self):
cycles = get_proxy_cycles(CONFIG)
self.assertEqual(get_proxies(cycles), {
'http': 'http://localhost:9090',
'https': 'http://localhost:9091'
})
self.assertEqual(get_proxies(cycles), {
'http': 'http://localhost:9092',
'https': 'http://localhost:9093'
})