mirror of https://github.com/searx/searx
Merge pull request #3015 from MarcAbonce/verify_tor_on_start2
Verify that Tor proxy works every time searx starts
This commit is contained in:
commit
7b368146a1
|
@ -8,7 +8,7 @@ from time import time
|
||||||
import httpx
|
import httpx
|
||||||
import h2.exceptions
|
import h2.exceptions
|
||||||
|
|
||||||
from .network import get_network, initialize
|
from .network import get_network, initialize, check_network_configuration
|
||||||
from .client import get_loop
|
from .client import get_loop
|
||||||
from .raise_for_httperror import raise_for_httperror
|
from .raise_for_httperror import raise_for_httperror
|
||||||
|
|
||||||
|
@ -154,7 +154,7 @@ def delete(url, **kwargs):
|
||||||
|
|
||||||
async def stream_chunk_to_queue(network, q, method, url, **kwargs):
|
async def stream_chunk_to_queue(network, q, method, url, **kwargs):
|
||||||
try:
|
try:
|
||||||
async with network.stream(method, url, **kwargs) as response:
|
async with await network.stream(method, url, **kwargs) as response:
|
||||||
q.put(response)
|
q.put(response)
|
||||||
async for chunk in response.aiter_bytes(65536):
|
async for chunk in response.aiter_bytes(65536):
|
||||||
if len(chunk) > 0:
|
if len(chunk) > 0:
|
||||||
|
|
|
@ -7,7 +7,7 @@ from itertools import cycle
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from .client import new_client, get_loop
|
from .client import new_client, get_loop, AsyncHTTPTransportNoHttp
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_NAME = '__DEFAULT__'
|
DEFAULT_NAME = '__DEFAULT__'
|
||||||
|
@ -36,9 +36,11 @@ class Network:
|
||||||
|
|
||||||
__slots__ = ('enable_http', 'verify', 'enable_http2',
|
__slots__ = ('enable_http', 'verify', 'enable_http2',
|
||||||
'max_connections', 'max_keepalive_connections', 'keepalive_expiry',
|
'max_connections', 'max_keepalive_connections', 'keepalive_expiry',
|
||||||
'local_addresses', 'proxies', 'max_redirects', 'retries', 'retry_on_http_error',
|
'local_addresses', 'proxies', 'using_tor_proxy', 'max_redirects', 'retries', 'retry_on_http_error',
|
||||||
'_local_addresses_cycle', '_proxies_cycle', '_clients')
|
'_local_addresses_cycle', '_proxies_cycle', '_clients')
|
||||||
|
|
||||||
|
_TOR_CHECK_RESULT = {}
|
||||||
|
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
enable_http=True,
|
enable_http=True,
|
||||||
verify=True,
|
verify=True,
|
||||||
|
@ -47,6 +49,7 @@ class Network:
|
||||||
max_keepalive_connections=None,
|
max_keepalive_connections=None,
|
||||||
keepalive_expiry=None,
|
keepalive_expiry=None,
|
||||||
proxies=None,
|
proxies=None,
|
||||||
|
using_tor_proxy=False,
|
||||||
local_addresses=None,
|
local_addresses=None,
|
||||||
retries=0,
|
retries=0,
|
||||||
retry_on_http_error=None,
|
retry_on_http_error=None,
|
||||||
|
@ -58,6 +61,7 @@ class Network:
|
||||||
self.max_keepalive_connections = max_keepalive_connections
|
self.max_keepalive_connections = max_keepalive_connections
|
||||||
self.keepalive_expiry = keepalive_expiry
|
self.keepalive_expiry = keepalive_expiry
|
||||||
self.proxies = proxies
|
self.proxies = proxies
|
||||||
|
self.using_tor_proxy = using_tor_proxy
|
||||||
self.local_addresses = local_addresses
|
self.local_addresses = local_addresses
|
||||||
self.retries = retries
|
self.retries = retries
|
||||||
self.retry_on_http_error = retry_on_http_error
|
self.retry_on_http_error = retry_on_http_error
|
||||||
|
@ -121,23 +125,49 @@ class Network:
|
||||||
while True:
|
while True:
|
||||||
yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
|
yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
|
||||||
|
|
||||||
def get_client(self, verify=None, max_redirects=None):
|
@staticmethod
|
||||||
|
async def check_tor_proxy(client: httpx.AsyncClient, proxies) -> bool:
|
||||||
|
if proxies in Network._TOR_CHECK_RESULT:
|
||||||
|
return Network._TOR_CHECK_RESULT[proxies]
|
||||||
|
|
||||||
|
result = True
|
||||||
|
# ignore client._transport because it is not used with all://
|
||||||
|
for transport in client._mounts.values(): # pylint: disable=protected-access
|
||||||
|
if isinstance(transport, AsyncHTTPTransportNoHttp):
|
||||||
|
continue
|
||||||
|
if not getattr(transport, '_rdns', False):
|
||||||
|
result = False
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
response = await client.get('https://check.torproject.org/api/ip')
|
||||||
|
if not response.json()['IsTor']:
|
||||||
|
result = False
|
||||||
|
Network._TOR_CHECK_RESULT[proxies] = result
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def get_client(self, verify=None, max_redirects=None):
|
||||||
verify = self.verify if verify is None else verify
|
verify = self.verify if verify is None else verify
|
||||||
max_redirects = self.max_redirects if max_redirects is None else max_redirects
|
max_redirects = self.max_redirects if max_redirects is None else max_redirects
|
||||||
local_address = next(self._local_addresses_cycle)
|
local_address = next(self._local_addresses_cycle)
|
||||||
proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
|
proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
|
||||||
key = (verify, max_redirects, local_address, proxies)
|
key = (verify, max_redirects, local_address, proxies)
|
||||||
if key not in self._clients or self._clients[key].is_closed:
|
if key not in self._clients or self._clients[key].is_closed:
|
||||||
self._clients[key] = new_client(self.enable_http,
|
client = new_client(
|
||||||
verify,
|
self.enable_http,
|
||||||
self.enable_http2,
|
verify,
|
||||||
self.max_connections,
|
self.enable_http2,
|
||||||
self.max_keepalive_connections,
|
self.max_connections,
|
||||||
self.keepalive_expiry,
|
self.max_keepalive_connections,
|
||||||
dict(proxies),
|
self.keepalive_expiry,
|
||||||
local_address,
|
dict(proxies),
|
||||||
0,
|
local_address,
|
||||||
max_redirects)
|
0,
|
||||||
|
max_redirects
|
||||||
|
)
|
||||||
|
if self.using_tor_proxy and not await self.check_tor_proxy(client, proxies):
|
||||||
|
await client.aclose()
|
||||||
|
raise httpx.ProxyError('Network configuration problem: not using Tor')
|
||||||
|
self._clients[key] = client
|
||||||
return self._clients[key]
|
return self._clients[key]
|
||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
|
@ -168,7 +198,7 @@ class Network:
|
||||||
retries = self.retries
|
retries = self.retries
|
||||||
while retries >= 0: # pragma: no cover
|
while retries >= 0: # pragma: no cover
|
||||||
kwargs_clients = Network.get_kwargs_clients(kwargs)
|
kwargs_clients = Network.get_kwargs_clients(kwargs)
|
||||||
client = self.get_client(**kwargs_clients)
|
client = await self.get_client(**kwargs_clients)
|
||||||
try:
|
try:
|
||||||
response = await client.request(method, url, **kwargs)
|
response = await client.request(method, url, **kwargs)
|
||||||
if self.is_valid_respones(response) or retries <= 0:
|
if self.is_valid_respones(response) or retries <= 0:
|
||||||
|
@ -178,11 +208,11 @@ class Network:
|
||||||
raise e
|
raise e
|
||||||
retries -= 1
|
retries -= 1
|
||||||
|
|
||||||
def stream(self, method, url, **kwargs):
|
async def stream(self, method, url, **kwargs):
|
||||||
retries = self.retries
|
retries = self.retries
|
||||||
while retries >= 0: # pragma: no cover
|
while retries >= 0: # pragma: no cover
|
||||||
kwargs_clients = Network.get_kwargs_clients(kwargs)
|
kwargs_clients = Network.get_kwargs_clients(kwargs)
|
||||||
client = self.get_client(**kwargs_clients)
|
client = await self.get_client(**kwargs_clients)
|
||||||
try:
|
try:
|
||||||
response = client.stream(method, url, **kwargs)
|
response = client.stream(method, url, **kwargs)
|
||||||
if self.is_valid_respones(response) or retries <= 0:
|
if self.is_valid_respones(response) or retries <= 0:
|
||||||
|
@ -202,6 +232,22 @@ def get_network(name=None):
|
||||||
return NETWORKS[name or DEFAULT_NAME]
|
return NETWORKS[name or DEFAULT_NAME]
|
||||||
|
|
||||||
|
|
||||||
|
def check_network_configuration():
|
||||||
|
async def check():
|
||||||
|
exception_count = 0
|
||||||
|
for network in NETWORKS.values():
|
||||||
|
if network.using_tor_proxy:
|
||||||
|
try:
|
||||||
|
await network.get_client()
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
exception_count += 1
|
||||||
|
return exception_count
|
||||||
|
future = asyncio.run_coroutine_threadsafe(check(), get_loop())
|
||||||
|
exception_count = future.result()
|
||||||
|
if exception_count > 0:
|
||||||
|
raise RuntimeError("Invalid network configuration")
|
||||||
|
|
||||||
|
|
||||||
def initialize(settings_engines=None, settings_outgoing=None):
|
def initialize(settings_engines=None, settings_outgoing=None):
|
||||||
from searx.engines import engines
|
from searx.engines import engines
|
||||||
from searx import settings
|
from searx import settings
|
||||||
|
@ -225,6 +271,7 @@ def initialize(settings_engines=None, settings_outgoing=None):
|
||||||
'keepalive_expiry': settings_outgoing.get('keepalive_expiry', 5.0),
|
'keepalive_expiry': settings_outgoing.get('keepalive_expiry', 5.0),
|
||||||
'local_addresses': settings_outgoing.get('source_ips'),
|
'local_addresses': settings_outgoing.get('source_ips'),
|
||||||
'proxies': settings_outgoing.get('proxies'),
|
'proxies': settings_outgoing.get('proxies'),
|
||||||
|
'using_tor_proxy': settings_outgoing.get('using_tor_proxy'),
|
||||||
# default maximum redirect
|
# default maximum redirect
|
||||||
# from https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
|
# from https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
|
||||||
'max_redirects': settings_outgoing.get('max_redirects', 30),
|
'max_redirects': settings_outgoing.get('max_redirects', 30),
|
||||||
|
|
|
@ -30,6 +30,7 @@ from searx import logger
|
||||||
from searx.plugins import plugins
|
from searx.plugins import plugins
|
||||||
from searx.search.models import EngineRef, SearchQuery
|
from searx.search.models import EngineRef, SearchQuery
|
||||||
from searx.search.processors import processors, initialize as initialize_processors
|
from searx.search.processors import processors, initialize as initialize_processors
|
||||||
|
from searx.network import check_network_configuration
|
||||||
from searx.search.checker import initialize as initialize_checker
|
from searx.search.checker import initialize as initialize_checker
|
||||||
|
|
||||||
|
|
||||||
|
@ -47,9 +48,11 @@ else:
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def initialize(settings_engines=None, enable_checker=False):
|
def initialize(settings_engines=None, enable_checker=False, check_network=False):
|
||||||
settings_engines = settings_engines or settings['engines']
|
settings_engines = settings_engines or settings['engines']
|
||||||
initialize_processors(settings_engines)
|
initialize_processors(settings_engines)
|
||||||
|
if check_network:
|
||||||
|
check_network_configuration()
|
||||||
if enable_checker:
|
if enable_checker:
|
||||||
initialize_checker()
|
initialize_checker()
|
||||||
|
|
||||||
|
|
|
@ -153,7 +153,7 @@ werkzeug_reloader = flask_run_development or (searx_debug and __name__ == "__mai
|
||||||
# initialize the engines except on the first run of the werkzeug server.
|
# initialize the engines except on the first run of the werkzeug server.
|
||||||
if not werkzeug_reloader\
|
if not werkzeug_reloader\
|
||||||
or (werkzeug_reloader and os.environ.get("WERKZEUG_RUN_MAIN") == "true"):
|
or (werkzeug_reloader and os.environ.get("WERKZEUG_RUN_MAIN") == "true"):
|
||||||
search_initialize(enable_checker=True)
|
search_initialize(enable_checker=True, check_network=True)
|
||||||
|
|
||||||
babel = Babel(app)
|
babel = Babel(app)
|
||||||
|
|
||||||
|
|
|
@ -90,12 +90,12 @@ class TestNetwork(SearxTestCase):
|
||||||
|
|
||||||
async def test_get_client(self):
|
async def test_get_client(self):
|
||||||
network = Network(verify=True)
|
network = Network(verify=True)
|
||||||
client1 = network.get_client()
|
client1 = await network.get_client()
|
||||||
client2 = network.get_client(verify=True)
|
client2 = await network.get_client(verify=True)
|
||||||
client3 = network.get_client(max_redirects=10)
|
client3 = await network.get_client(max_redirects=10)
|
||||||
client4 = network.get_client(verify=True)
|
client4 = await network.get_client(verify=True)
|
||||||
client5 = network.get_client(verify=False)
|
client5 = await network.get_client(verify=False)
|
||||||
client6 = network.get_client(max_redirects=10)
|
client6 = await network.get_client(max_redirects=10)
|
||||||
|
|
||||||
self.assertEqual(client1, client2)
|
self.assertEqual(client1, client2)
|
||||||
self.assertEqual(client1, client4)
|
self.assertEqual(client1, client4)
|
||||||
|
@ -107,7 +107,7 @@ class TestNetwork(SearxTestCase):
|
||||||
|
|
||||||
async def test_aclose(self):
|
async def test_aclose(self):
|
||||||
network = Network(verify=True)
|
network = Network(verify=True)
|
||||||
network.get_client()
|
await network.get_client()
|
||||||
await network.aclose()
|
await network.aclose()
|
||||||
|
|
||||||
async def test_request(self):
|
async def test_request(self):
|
||||||
|
@ -211,7 +211,7 @@ class TestNetworkStreamRetries(SearxTestCase):
|
||||||
async def test_retries_ok(self):
|
async def test_retries_ok(self):
|
||||||
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
|
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
|
||||||
network = Network(enable_http=True, retries=1, retry_on_http_error=403)
|
network = Network(enable_http=True, retries=1, retry_on_http_error=403)
|
||||||
response = network.stream('GET', 'https://example.com/')
|
response = await network.stream('GET', 'https://example.com/')
|
||||||
self.assertEqual(response.text, TestNetworkStreamRetries.TEXT)
|
self.assertEqual(response.text, TestNetworkStreamRetries.TEXT)
|
||||||
await network.aclose()
|
await network.aclose()
|
||||||
|
|
||||||
|
@ -219,7 +219,7 @@ class TestNetworkStreamRetries(SearxTestCase):
|
||||||
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
|
with patch.object(httpx.AsyncClient, 'stream', new=TestNetworkStreamRetries.get_response_exception_then_200()):
|
||||||
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
|
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
|
||||||
with self.assertRaises(httpx.RequestError):
|
with self.assertRaises(httpx.RequestError):
|
||||||
network.stream('GET', 'https://example.com/')
|
await network.stream('GET', 'https://example.com/')
|
||||||
await network.aclose()
|
await network.aclose()
|
||||||
|
|
||||||
async def test_retries_exception(self):
|
async def test_retries_exception(self):
|
||||||
|
@ -234,6 +234,6 @@ class TestNetworkStreamRetries(SearxTestCase):
|
||||||
|
|
||||||
with patch.object(httpx.AsyncClient, 'stream', new=stream):
|
with patch.object(httpx.AsyncClient, 'stream', new=stream):
|
||||||
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
|
network = Network(enable_http=True, retries=0, retry_on_http_error=403)
|
||||||
response = network.stream('GET', 'https://example.com/')
|
response = await network.stream('GET', 'https://example.com/')
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
await network.aclose()
|
await network.aclose()
|
||||||
|
|
Loading…
Reference in New Issue