From 34afcf254193498e3c0d1411d84e895efb1ebc4b Mon Sep 17 00:00:00 2001 From: Adam Tauber Date: Sun, 7 Feb 2016 17:27:25 +0100 Subject: [PATCH] [fix] connection reuse ++ shared dns --- searx/curladapter.py | 87 +++++++++++++++++++------------------------- searx/search.py | 6 +-- 2 files changed, 39 insertions(+), 54 deletions(-) diff --git a/searx/curladapter.py b/searx/curladapter.py index 9b14536d..451bdace 100644 --- a/searx/curladapter.py +++ b/searx/curladapter.py @@ -1,14 +1,22 @@ import logging import pycurl +import threading +from itertools import cycle from StringIO import StringIO +from time import time from urllib import urlencode +CURL_SHARE = pycurl.CurlShare() +CURL_SHARE.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS) +MULTI_HANDLER = pycurl.CurlMulti() + + def __test_callback(*args): print "callback called" -def get_new_connection(source_address=None): +def get_connection(source_address=None): # pycurl initialization h = pycurl.Curl() @@ -18,46 +26,14 @@ def get_new_connection(source_address=None): # consistently use ipv4 h.setopt(h.IPRESOLVE, pycurl.IPRESOLVE_V4) + h.setopt(pycurl.SHARE, CURL_SHARE) + if source_address: h.setopt(h.INTERFACE, source_address) return h -class ConnectionSources(object): - def __init__(self, sources=None): - self.sources = [] - if sources: - for s in sources: - self.sources.append(get_new_connection(s)) - else: - self.sources.append(get_new_connection()) - self.ptr = 0 - - def get_source(self): - source = self.sources[self.ptr] - self.ptr = (self.ptr + 1) % len(self.sources) - return source - - -class ConnectionCache(object): - def __init__(self, source_ips=None): - self.source_ips = source_ips if source_ips else None - self.connections = {} - - def get_connection(self, key): - if key in self.connections: - return self.connections[key].get_source() - - sources = ConnectionSources(self.source_ips) - self.connections[key] = sources - return sources.get_source() - - -CONNECTION_CACHE = ConnectionCache() -MULTI_HANDLER = pycurl.CurlMulti() - - class RequestContainer(object): def __init__(self, url, @@ -110,29 +86,27 @@ class ResponseContainer(object): class MultiRequest(object): - def __init__(self, connection_cache=None, multi_handler=None): + def __init__(self, multi_handler=None, source_ips=None): self.requests = {} - if connection_cache: - self.connection_cache = connection_cache - else: - self.connection_cache = CONNECTION_CACHE - if multi_handler: self._curl_multi_handler = multi_handler else: self._curl_multi_handler = MULTI_HANDLER - def add(self, connection_name, url, **kwargs): - handle = self.connection_cache.get_connection(connection_name) + self.source_ips = cycle(source_ips) if source_ips else cycle([None]) + + def add(self, url, **kwargs): + handle = get_connection(next(self.source_ips)) request_container = RequestContainer(url, handle, **kwargs) try: self._curl_multi_handler.add_handle(handle) except: + print 'meep' pass self.requests[handle] = request_container - def perform_requests(self): + def send_requests(self): select_timeout = 0.1 # set timeout @@ -154,6 +128,7 @@ class MultiRequest(object): else: h.unsetopt(h.COOKIE) + search_start = time() handles_num = len(self.requests) while handles_num: self._curl_multi_handler.select(select_timeout) @@ -164,23 +139,35 @@ class MultiRequest(object): _, success_list, error_list = self._curl_multi_handler.info_read() # calling callbacks for h in success_list: - self.requests[h].finish() + th = threading.Thread( + target=self.requests[h].finish(), + name='search_request', + ) + th.start() + # self.requests[h].finish() for h, err_code, err_string in error_list: logging.warn('Error on %s: "%s"', self.requests[h].url, err_string) handles_num -= len(success_list) + len(error_list) if ret != pycurl.E_CALL_MULTI_PERFORM: break + for th in threading.enumerate(): + if th.name == 'search_request': + remaining_time = max(0.0, timeout - (time() - search_start)) + th.join(remaining_time) + if th.isAlive(): + logging.warning('engine timeout: {0}'.format(th._engine_name)) + # self._curl_multi_handler.close() return self.requests.values() if __name__ == '__main__': r = MultiRequest() - r.add('a', 'http://httpbin.org/delay/0', headers={'User-Agent': 'x'}) - r.add('d', 'http://127.0.0.1:7777/', headers={'User-Agent': 'x'}) - r.add('b', 'http://httpbin.org/delay/0', cookies={'as': 'sa', 'bb': 'cc'}) - r.add('c', 'http://httpbin.org/delay/0', callback=__test_callback, timeout=1.0, headers={'User-Agent': 'x'}) - for v in r.perform_requests(): + r.add('http://httpbin.org/delay/0', headers={'User-Agent': 'x'}) + r.add('http://127.0.0.1:7777/', headers={'User-Agent': 'x'}) + r.add('http://httpbin.org/delay/0', cookies={'as': 'sa', 'bb': 'cc'}) + r.add('http://httpbin.org/delay/0', callback=__test_callback, timeout=1.0, headers={'User-Agent': 'x'}) + for v in r.send_requests(): print v.url print v.response.text diff --git a/searx/search.py b/searx/search.py index e1917696..08ee05c1 100644 --- a/searx/search.py +++ b/searx/search.py @@ -320,15 +320,13 @@ class Search(object): continue # append request to list - mr.add((selected_engine['name'], selected_engine['category']), - request_params['url'], - **request_args) + mr.add(request_params['url'], **request_args) if not mr.requests: return self # send all search-request - mr.perform_requests() + mr.send_requests() # return results, suggestions, answers and infoboxes return self